自定义消费

最近更新时间:2024-04-16 11:32:41

我的收藏

前提条件

1. 开通日志服务,创建 日志集日志主题,并成功采集到日志数据。
2. 子账号/协作者需要主账号授权,授权步骤参见 基于 CAM 管理权限,复制授权策略参见 CLS 访问策略模板

消费组消费流程

使用消费组消费数据时,服务端会管理消费组里的所有消费者的消费任务,根据主题分区和消费者的数量关系自动调整消费任务的均衡性,同时会记录每个主题分区的消费进度,保证不同消费者可以无重复消费数据。消费组消费的具体流程如下:
1. 创建消费组。
2. 每个消费者定期向服务端发送心跳。
3. 消费组根据主题分区负载情况自动分配主题分区给消费者。
4. 消费者根据所分配的分区列表,获取分区 offset 并消费数据。
5. 消费者周期性地更新分区的消费进度到消费组,便于下次消费组分配任务。
6. 重复步骤2 - 步骤6,直至消费结束。

负载均衡消费原理

消费组会根据活跃消费者和主题分区的数量动态调整每个消费者的消费任务,保证消费的均衡性。同时,消费者会保存每个主题分区的消费进度,保证故障恢复后可继续消费数据,避免重复消费。

例一:主题分区发生变化

例如,某个日志主题有两个消费者,消费者 A 负责消费1,2号分区,消费者 B 负责消费3,4号分区,通过分裂操作新增主题分区5后,消费组会自动将5号分区分配给消费者 B 进行消费,如下图所示:
1561034489523



例二:消费者发生变化

例如,某个日志主题有两个消费者,消费者 A 负责消费1,2,3号分区,消费者 B 负责消费4,5,6号分区,为保证消费速度持平生成速度,新增一个消费者 C,消费组会重新进行均衡分配,将3、6号分区分配给新消费者 C 进行消费,如下图所示:
1561035193214



消费参数说明

配置参数
说明
默认值
取值范围
endpoint
请求域名API 上传日志标签页面的域名
-
支持地域:北京,上海,广州,南京,中国香港,东京,美东,新加坡,法兰克福
access_key_id
用户的 Secret_id,请前往 CAM 查看
-
-
access_key
用户的 Secret_key,请前往 CAM 查看
-
-
region
主题所在地域,例如 ap-beijing、ap_guangzhou、ap-shanghai,详情请参见 地域和访问域名
-
支持地域:北京,上海,广州,南京,中国香港,东京,美东,新加坡,法兰克福
logset_id
日志集 ID,仅支持一个日志集
-
-
topic_ids
日志主题 ID,多个主题请使用','隔开
-
-
consumer_group_name
消费者组名称
-
-
internal
内网:TRUE
公网:FALSE
说明:
内网/外网读流量费用请参见 产品定价
FALSE
TRUE/FALSE
consumer_name
消费者名称。同一个消费者组内,消费者名称不可重复
-
0-9、aA-zZ、 '-'、'_'、'.'组成的字符串
heartbeat_interval
消费者心跳上报间隔,2个间隔没有上报心跳,会被认为是消费者下线
20
0-30分钟
data_fetch_interval
消费者拉取数据间隔,不小于1秒
2
-
offset_start_time
拉取数据的开始时间,字符串类型的 UNIX 时间戳,精度为秒,例如 "1711607794",也可以直接可配置为"begin"、"end"。
begin:日志主题生命周期内的最早数据
end:日志主题生命周期内的最新数据
"end"
"begin"/"end"/UNIX 时间戳
max_fetch_log_group_size
消费者单次拉取数据大小,默认2M,最大10M
2097152
2M - 10M
offset_end_time
拉取数据的结束时间,支持字符串类型的 UNIX时间戳,精度为秒,例如"1711607794"。不填写代表持续拉取
-
-

消费 Demo (Python)

python sdk 可参见 tencentcloud-cls-sdk-python
# -*- coding: utf-8 -*-

import os
from threading import RLock
from tencentcloud.log.consumer import *
from tencentcloud.log.logclient import YunApiLogClient

root = logging.getLogger()

handler = logging.StreamHandler()

handler.setFormatter(logging.Formatter(

fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s',

datefmt='%Y-%m-%d %H:%M:%S'))

root.setLevel(logging.INFO)

root.addHandler(handler)

logger = logging.getLogger(__name__)

class SampleConsumer(ConsumerProcessorBase):
#记录上次消费offset的时间。
last_check_time = 0
#记录消费的日志数据
log_results = []

lock = RLock()

def initialize(self, topic_id):

self.topic_id = topic_id

def process(self, log_groups, offset_tracker):
#获取消费的日志数据
for log_group in log_groups:

for log in log_group.logs:

item = dict()

item['time'] = log.time

item['filename'] = log_group.filename

item['source'] = log_group.source

for content in log.contents:

item[content.key] = content.value

with SampleConsumer.lock:

SampleConsumer.log_results.append(item)
#提交offset
current_time = time.time()

if current_time - self.last_check_time > 3:

try:

self.last_check_time = current_time

offset_tracker.save_offset(True)

except Exception:

import traceback

traceback.print_exc()

else:

try:

offset_tracker.save_offset(False)

except Exception:

import traceback

traceback.print_exc()

return None

def shutdown(self, offset_tracker):

try:

offset_tracker.save_offset(True)

except Exception:

import traceback

traceback.print_exc()


def sleep_until(seconds, exit_condition=None, expect_error=False):

if not exit_condition:

time.sleep(seconds)

return

s = time.time()

while time.time() - s < seconds:

try:

if exit_condition():

break

except Exception:

if expect_error:

continue

time.sleep(1)
# 创建消费组
def sample_consumer_group():

# load options from envs
#请求域名
endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
#用户的Secret_id
access_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
#用户的Secret_key
access_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
#日志集id
logset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
#日志主题id列表,支持多个
topic_ids = ['topic_id_1','topic_id_2']
#消费组,同一个日志集下的消费组名称唯一
consumer_group = 'consumer-group-1'
#消费者1
consumer_name1 = "consumer-group-1-A"
#消费者2
consumer_name2 = "consumer-group-1-B"
#地域
region = "ap-guangzhou"

assert endpoint and access_key_id and access_key and logset_id, ValueError("endpoint/access_id/access_key and logset_id cannot be empty")
#创建访问云API接口的Client
client = YunApiLogClient(access_key_id, access_key, region=region)
#初始化消费结果列表
SampleConsumer.log_results = []

try:

# 创建两个消费者配置
option1 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,

consumer_name1, heartbeat_interval=3, data_fetch_interval=1,

offset_start_time="end", max_fetch_log_group_size=10485760)

option2 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,

consumer_name2, heartbeat_interval=3, data_fetch_interval=1,

offset_start_time="end", max_fetch_log_group_size=10485760)

print("*** start to consume data...")
#消费者1
client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
#启动消费者1
client_worker1.start()

client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)

client_worker2.start()
sleep_until(120, lambda: len(SampleConsumer.log_results) > 0)

print("*** consumer group status ***")
#打印消费组信息:消费组的名称、消费的日志主题、消费者心跳超时时间
ret = client.list_consumer_group(logset_id, topic_ids)
ret.log_print()

print("*** stopping workers")
#关闭消费者
client_worker1.shutdown()

client_worker2.shutdown()

print("*** delete consumer group")
#删除消费者组
client.delete_consumer_group(logset_id, consumer_group)

except Exception as e:

raise e
#打印消费的日志数据
ret = str(SampleConsumer.log_results)

print("*** get content:")

print(ret)


if __name__ == '__main__':

sample_consumer_group()