How to design dynamodb to Elastic search with Insert/Modify/Remove(如何使用插入/修改/删除将 dynamodb 设计为弹性搜索)
问题描述
如何使用 Python 将整个文档传递到弹性搜索中?这是放入弹性搜索的正确方法吗?
How to pass this entire document into elastic search using Python? Is this this the right way to put into elastic search?
在dynamodb中id
是主键
In dynamodb id
is the primary key
如何插入dynamodb 下面是代码
How to insert in to dynamodb Below is the code
import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
dynamodb = boto3.resource ('dynamodb')
table =dynamodb.Table('newtable')
with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
batch.put_item(
Item={
'id': '1',
'last_name': 'V',
'age': '2',
}
)
batch.put_item(
Item={
'id': '2',
'last_name': 'JJ',
'age': '7',
}
)
batch.put_item(
Item={
'id': '9',
'last_name': 'ADD',
'age': '95',
}
)
batch.put_item(
Item={
'id': '10',
'last_name': 'ADD',
'age': '95',
}
)
如何将期望推送到 Elastic Search 中
How to push expected out into Elastic Search
dynamodb 内容发生变化时如何在 ES 中自动反映
How to reflect automatically in ES if dynamodb content changes
我已经浏览了链接 https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/
下面是我收到错误的代码
ERROR: NameError("name 'event' is not defined")
Below is code I am getting error
ERROR: NameError("name 'event' is not defined")
代码.* 在此之前从 dynamodb 表中触发以下 lambda 函数
Code. * Before that trigger the below lambda function from the dynamodb table
import boto3 import json import re from requests_aws4auth import AWS4Auth from elasticsearch import Elasticsearch, RequestsHttpConnection session = boto3.session.Session() credentials = session.get_credentials() # s3 = session.resource('s3') awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, session.region_name, 'es', session_token=credentials.token) es = Elasticsearch( ['https://xx-east-1.es.amazonaws.com'], http_auth=awsauth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection ) reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size", "_timestamp", "_ttl"] def lambda_handler(event, context): print(event) dynamodb = boto3.resource('dynamodb') # Loop over the DynamoDB Stream records for record in event['Records']: try: if record['eventName'] == "INSERT": insert_document(es, record) elif record['eventName'] == "REMOVE": remove_document(es, record) elif record['eventName'] == "MODIFY": modify_document(es, record) except Exception as e: print("Failed to process:") print(json.dumps(record)) print("ERROR: " + repr(e)) continue # Process MODIFY events def modify_document(es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(record) print("KEY") print(docId) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document()) print("Updated document:") print(doc) # We reindex the whole document as ES accepts partial docs es.index(index=table, body=doc, id=docId, doc_type=table, refresh=True) print("Successly modified - Index: " + table + " - Document ID: " + docId) def remove_document(es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(record) print("Deleting document ID: " + docId) es.delete(index=table, id=docId, doc_type=table, refresh=True) print("Successly removed - Index: " + table + " - Document ID: " + docId) # Process INSERT events def insert_document(es, record): table = getTable(record) print("Dynamo Table: " + table) # Create index if missing if es.indices.exists(table) == False: print("Create missing index: " + table) es.indices.create(table, body='{"settings": { "index.mapping.coerce": true } }') print("Index created: " + table) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document()) print("New document to Index:") print(doc) newId = docid(record) es.index(index=table, body=doc, id=newId, doc_type=table, refresh=True) print("Successly inserted - Index: " + table + " - Document ID: " + newId) def getTable(record): p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+') m = p.match(record['eventSourceARN']) if m is None: raise Exception("Table not found in SourceARN") return m.group(1).lower() def document(event): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['NewImage'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for i in result: return i def docid(event): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['Keys'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for newId in result: return newId
在文档和文档中出现错误
Getting error at document and docid
各自都有输出
result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['NewImage'].items(): #for k, v in r['dynamodb']['Keys'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for i in result: print (i) event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}
推荐答案
您可以检查以下内容.我尝试复制该问题并且可以确认错误
You can check the following. I tried to replicate the issue and can confirm the error of
ERROR: NameError("name 'event' is not defined")
我使用了 simulated INSERT
event
来自 DynamoDb 流,基于您的示例和 我自己的表:I used simulated INSERT
event
from DynamoDb stream, based on your example and my own table:{ "Records": [ { "eventID": "b8b993cf16d1aacb61b40411b39e0b1f", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "1" } }, "NewImage": { "last_name": { "S": "V" }, "id": { "N": "1" }, "age": { "S": "2" } }, "SequenceNumber": "25200000000020406897812", "SizeBytes": 22, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "e5d5bec988945c06ffc879cf16b89bf7", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "9" } }, "NewImage": { "last_name": { "S": "ADD" }, "id": { "N": "9" }, "age": { "S": "95" } }, "SequenceNumber": "25300000000020406897813", "SizeBytes": 25, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "f1a7c9736253b5ef28ced38ed5ff645b", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "2" } }, "NewImage": { "last_name": { "S": "JJ" }, "id": { "N": "2" }, "age": { "S": "7" } }, "SequenceNumber": "25400000000020406897819", "SizeBytes": 23, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "bfcbad9dc19883e4172e6dc25e66637b", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "10" } }, "NewImage": { "last_name": { "S": "ADD" }, "id": { "N": "10" }, "age": { "S": "95" } }, "SequenceNumber": "25500000000020406897820", "SizeBytes": 25, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" } ] }
示例修改
事件
:{ "Records": [ { "eventID": "4e4629c88aa00e366c89a293d9c82d54", "eventName": "MODIFY", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595924589.0, "Keys": { "id": { "N": "2" } }, "NewImage": { "last_name": { "S": "zhgdhfgdh" }, "id": { "N": "2" }, "age": { "S": "7" } }, "OldImage": { "last_name": { "S": "JJ" }, "id": { "N": "2" }, "age": { "S": "7" } }, "SequenceNumber": "25600000000020408264140", "SizeBytes": 49, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569" } ] }
lambda函数的修改代码,我现在可以确认不会产生错误:
Modified code of lambda function, which I can confirm does not produce errors now:
import boto3 import json import re from requests_aws4auth import AWS4Auth from elasticsearch import Elasticsearch, RequestsHttpConnection session = boto3.session.Session() credentials = session.get_credentials() s3 = session.resource('s3') awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, session.region_name, 'es', session_token=credentials.token) es = Elasticsearch( ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'], use_ssl=True, verify_certs=True, http_auth=awsauth, connection_class=RequestsHttpConnection ) reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size", "_timestamp", "_ttl"] def lambda_handler(event, context): print(event) #dynamodb = boto3.resource('dynamodb') # Loop over the DynamoDB Stream records for record in event['Records']: if record['eventName'] == "INSERT": insert_document(event, es, record) elif record['eventName'] == "REMOVE": remove_document(event, es, record) elif record['eventName'] == "MODIFY": modify_document(event, es, record) # Process MODIFY events def modify_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(event, event) print("KEY") print(docId) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document(event)) print("Updated document:") print(doc) # We reindex the whole document as ES accepts partial docs es.index(index=table, body=doc, id=docId, doc_type=table, refresh=True) print("Successly modified - Index: " , table , " - Document ID: " , docId) def remove_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(event, event) print("Deleting document ID: ", docId) es.delete(index=table, id=docId, doc_type=table, refresh=True) print("Successly removed - Index: ", table, " - Document ID: " , docId) # Process INSERT events def insert_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) # Create index if missing if es.indices.exists(table) == False: print("Create missing index: " + table) es.indices.create(table, body='{"settings": { "index.mapping.coerce": true } }') print("Index created: " + table) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document(event)) print("New document to Index:") print(doc) newId = docid(event, record) es.index(index=table, body=doc, id=newId, doc_type=table, refresh=True) print("Successly inserted - Index: " , table + " - Document ID: " , newId) def getTable(record): p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+') m = p.match(record['eventSourceARN']) if m is None: raise Exception("Table not found in SourceARN") return m.group(1).lower() def document(event): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['NewImage'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for i in result: return i def docid(event, record): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['Keys'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for newId in result: return newId
我尚未验证数据是否正确写入、修改或插入 ElasticSearch.但是我运行了 ES 域,并在 lambda 中使用它来验证 lambda 是否可以连接到它并运行查询.
I haven't verified if data is correctly written, modified or inserted into ElasticSearch. But I had ES domain running and used in the lambda to verify if lambda can connect to it and run the queries.
插入事件的 lambda 输出示例:
Example output from lambda for INSERT event:
Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Example output from lambda from MODIFY event:
更新文档:
{ "last_name": "zhgdhfgdh", "age": "7" } Successly modified - Index: newtable - Document ID: {}
我认为
docid
是否正常工作需要进一步调查,因为它似乎返回空字典:I think
docid
requires further investigation if it works correctly as it seems to return empty dict:Document ID: {}
这篇关于如何使用插入/修改/删除将 dynamodb 设计为弹性搜索的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何使用插入/修改/删除将 dynamodb 设计为弹性搜索
基础教程推荐
- 如何让 python 脚本监听来自另一个脚本的输入 2022-01-01
- 如何在海运重新绘制中自定义标题和y标签 2022-01-01
- 用于分类数据的跳跃记号标签 2022-01-01
- 何时使用 os.name、sys.platform 或 platform.system? 2022-01-01
- 使用PyInstaller后在Windows中打开可执行文件时出错 2022-01-01
- 线程时出现 msgbox 错误,GUI 块 2022-01-01
- Python kivy 入口点 inflateRest2 无法定位 libpng16-16.dll 2022-01-01
- 在 Python 中,如果我在一个“with"中返回.块,文件还会关闭吗? 2022-01-01
- Dask.array.套用_沿_轴:由于额外的元素([1]),使用dask.array的每一行作为另一个函数的输入失败 2022-01-01
- 筛选NumPy数组 2022-01-01