Conflunet Kafka 5.0.0已经安装在拥有公网IP的亚马逊EC2上,比如54.XX.XX.XX在0.0.0.0的EC2机器上打开了9092端口
在/etc/kafka/server.properties中有advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092,在/etc/kafka/producer.properties中有listeners=PLAINTEXT://0.0.0.0:9092我有bootstrap.servers=0.0.0.0:9092
本地文件iotstatesboto.py写成如下,它有汇合的生产者代码:
from confluent_kafka import Producer
import json
broker = '54.XX.XX.XX'
topic = 'mytopic'
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {}'.format(msg.topic()))
def lambda_handler(event, context):
p = Producer({'bootstrap.servers': broker})
message = json.dumps(event)
print(message)
p.produce(topic, message.encode('utf-8'), callback=delivery_report)
return {
'message' : message
}
zip包在本地制作,格式如下:
pip install confluent_kafka has been done in the same directory
zip -r iotstatesboto.zip iotstatesboto.py confluent*
这个zip文件被上传到Lambda函数。然后,当"Test“函数发送一个虚拟消息时,会出现以下错误
首先:
{
"errorMessage": "Unable to import module 'iotstatesboto'"
}
还有一个是:
Unable to import module 'iotstatesboto': No module named 'confluent_kafka.cimpl'
我已将处理程序名称设置为"iotstatesboto.lambda_handler“
为了让生产者从lambda函数写入EC2上的kakfa流,有没有线索在步骤中遗漏了什么?
发布于 2018-10-11 07:28:46
使用AWS Lambda时,您必须手动提供所有库,即将它们添加到您的lambda函数代码所使用的zip中。您还必须添加所有共享对象库(如果有的话)。
在本例中,AWS Lambda为您提供了python环境,该环境由Python标准库+ Boto3组成,因此没有其他库。
https://stackoverflow.com/questions/52754522
复制