上文提到 将K8S日志采集到日志服务,这次介绍将采集的日志投递到自建 Kafka 中,用于 Spark 计算。
容器日志
-> 日志服务
-> 使用函数处理,将日志投递至自建 Kafka
本文介绍如何创建云函数,将日志投递至 Kafka 中。
打开 函数服务列表,基于模板 CLS 数据转存到 Ckafka
创建函数。
虽然模板是投递 Ckafka,不过 Ckafka和 Kafka 兼容性好,所以投递 Kafka 也没问题。
启用私有网络,函数服务使用的 VPC 和 Kafka 所在 VPC 相同。
如果不同,可以使用 对等连接 解决。
默认模板会把日志原始数据当成字符串,把每个字符当成一行 message 进行输出(for record in records:
),需要调整代码。
不知道是因为我的日志服务原始数据设置的是 JSON 格式,还是当前
CLS 数据转存到 Ckafka
模板过时了
有 3 处代码修改,详见注释,完整代码如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import logging
import os
import base64
import json
import gzip
import urllib
from kafka import KafkaProducer
from kafka.errors import KafkaError
from StringIO import StringIO
logger = logging.getLogger('kafka')
logger.setLevel(logging.INFO)
class ClsToKafka(object):
"""
CLS 消息投递 kafka
"""
def __init__(self, host, **kwargs):
self.host = host
self.producer = KafkaProducer(bootstrap_servers = [self.host],
retries = 10,
max_in_flight_requests_per_connection = 1,
request_timeout_ms = 30000,
max_block_ms = 60000,
**kwargs
)def send(self, topic, records):
"""
异步生产 kafka 消息
"""
global count
count = 0
def on_send_success(record_metadata):
global count
count = count +1
def on_send_error(excp):
logger.error('failed to send message', exc_info = excp)
s_time = time.time()
try:
## 修改 1: 原始消息是 JSON 格式(日志服务采集容器服务输出的日志格式是 JSON),每条消息位于 .records(type: List)
## for record in records:
for record in records['records']:
key = ""# 当 key 为""或者为"None" 时,要传入 key=None,这样 python kafka 库会随机选取一个 partition 写入消息
if key == "" or key =="None":
key = None
## 修改 2:record 是 dict,因为原始数据就是 JSON,需要转成 str,否则调用 self.producer.send 会报错 "assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))"
value = json.dumps(record)
# 也可以对消息进行处理后再转存
#value = deal_message(data)
self.producer.send(topic, key = key, value = value).add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
self.producer.flush()
except KafkaError as e:
return e
finally:
if self.producer is not None:
self.producer.close()e_time = time.time()
return "{} messages delivered in {}s".format(count, e_time - s_time)
# 这里可以对消息进行处理后返回
def deal_message(message):
return message
def main_handler(event, context):
kafka_address = os.getenv("kafka_address")
kafka_topic_name = os.getenv("kafka_topic_name")
kafka_to_kafka = ClsToKafka(
kafka_address
#security_protocol = "SASL_PLAINTEXT",
#sasl_mechanism = "PLAIN",
#sasl_plain_username = "ckafka-80o10xxx#lkoxx",
#sasl_plain_password = "ccllxxxx",
#api_version=(0, 10, 2)
)event = json.loads(gzip.GzipFile(fileobj=StringIO(event['clslogs']['data'].decode('base64'))).read())# print("type of event: %s" % type(event))
## 修改 3:直接使用 event 这个字典,便于从字典中获取每条消息的内容
## data = json.dumps(event, indent=4, sort_keys=True)
## ret = kafka_to_kafka.send(kafka_topic_name,data)
ret = kafka_to_kafka.send(kafka_topic_name,event)
logger.info(ret)
return ret
云函数需要使用 kafka_address
、kafka_topic_name
这 2 个变量,在 环境配置
中配好。
点击创建后,部署成功。
在日志服务的 日志主题 页面找到需要投递消息的主题,在 函数处理
TAB 中 选择刚创建的函数即可。
函数处理创建成功。
等待 1 分钟后,查看函数每次调用的日志,可以看到调用已成功。
同时可以了解整体调用监控数据。
自建的 Kakfa 是使用 Cloudera Management 创建的,在 CM 中看到 Topic 已有数据写入。
使用命令行也可以看到数据持续写入。
# ./kafka-console-consumer.sh --bootstrap-server 10.0.0.29:9092 --topic scf_topic --offset latest --partition 0
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:37468\",\"RequestURI\":\"/\",\"Type\":\"web\",\"UserAgent\":\"Qcloud-boce\",\"X-Forwarded-For\":\"58.87.66.69\"}", "timestamp": 1618716491203428}
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:36864\",\"RequestURI\":\"/\",\"Type\":\"curl\",\"UserAgent\":\"Qcloud-curl\",\"X-Forwarded-For\":\"180.163.9.66\"}", "timestamp": 1618716494178403}
...
Processed a total of 9 messages
如果函数调用有日志有如下报错,则证明 Kafka broker 未设置对外可访问的地址,参照 Won’t Connect to My Apache Kafka Cluster 修改 advertised.listeners
配置即可。
DNS lookup failed for hadoop-29.com:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
DNS lookup failed for hadoop-29.com:9092 (0)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。