Kafka是一个分布式流处理平台,主要用于处理实时数据流。它可以用于日志收集、数据流处理、消息队列等场景。在大数据处理、实时数据分析等领域,Kafka被广泛应用。
Kafka的主要功能包括消息发布和订阅、消息存储和消息处理。
Kafka的概念包括生产者、消费者、主题、分区、偏移量等。生产者负责向Kafka发送消息,消费者负责从Kafka接收消息,主题是消息的分类,分区是主题的分片,偏移量是消息在分区中的位置。
Kafka有四个核心的API:
Kafka官网:https://kafka.apache.org/
Kafka中文文档:https://kafka.apachecn.org/
Confluent.Kafka
if (AppSetting.Kafka.UseConsumer)
builder.RegisterType<KafkaConsumer<string, string>>().As<IKafkaConsumer<string, string>>().SingleInstance();
if (AppSetting.Kafka.UseProducer)
builder.RegisterType<KafkaProducer<string, string>>().As<IKafkaProducer<string, string>>().SingleInstance();
1、IKafkaConsumer
public interface IKafkaConsumer<TKey, TValue> : IDisposable
{
/// <summary>
/// 订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topic">主题</param>
void Consume(Func<ConsumeResult<TKey, TValue>, bool> Func, string Topic);
/// <summary>
/// 批量订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topics">主题集合</param>
void ConsumeBatch(Func<ConsumeResult<TKey, TValue>, bool> Func, List<string> Topics);
/// <summary>
/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>
/// <returns>待消费数据</returns>
List<ConsumeResult<TKey, TValue>> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100);
/// <summary>
/// 单笔消费模式-单行消费
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <returns>待消费数据</returns>
ConsumeResult<TKey, TValue> ConsumeOneRow(string Topic, int TimeOut = 300);
}
2、IKafkaProducer
public interface IKafkaProducer<TKey, TValue>
{
/// <summary>
/// 生产
/// </summary>
/// <param name="Key"></param>
/// <param name="Value"></param>
/// <param name="Topic"></param>
void Produce(TKey Key, TValue Value, string Topic);
/// <summary>
/// 生产 异步
/// </summary>
/// <param name="Key"></param>
/// <param name="Value"></param>
/// <param name="Topic"></param>
/// <returns></returns>
Task ProduceAsync(TKey Key, TValue Value, string Topic);
}
1、KafkaConsumer
/// <summary>
/// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string)
/// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式
/// </summary>
/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
public class KafkaConsumer<TKey, TValue> : KafkaConfig, IKafkaConsumer<TKey, TValue>
{
/// <summary>
/// Kafka地址(包含端口号)
/// </summary>
public string Servers
{
get
{
return ConsumerConfig.BootstrapServers;
}
set
{
ConsumerConfig.BootstrapServers = value;
}
}
/// <summary>
/// 消费者群组
/// </summary>
public string GroupId
{
get
{
return ConsumerConfig.GroupId;
}
set
{
ConsumerConfig.GroupId = value;
}
}
/// <summary>
/// 自动提交 默认为 false
/// </summary>
public bool EnableAutoCommit
{
get
{
return ConsumerConfig.EnableAutoCommit ?? false;
}
set
{
ConsumerConfig.EnableAutoCommit = value;
}
}
/// <summary>
/// 订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topic">主题</param>
public void Consume(Func<ConsumeResult<TKey, TValue>, bool> Func, string Topic)
{
Task.Factory.StartNew(() =>
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
builder.SetErrorHandler((_, e) =>
{
Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
}).SetStatisticsHandler((_, json) =>
{
Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
}).SetPartitionsAssignedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
}).SetPartitionsRevokedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
});
using var consumer = builder.Build();
consumer.Subscribe(Topic);
while (AppSetting.Kafka.IsConsumerSubscribe) //true
{
ConsumeResult<TKey, TValue> result = null;
try
{
result = consumer.Consume();
if (result.IsPartitionEOF) continue;
if (Func(result))
{
if (!(bool)ConsumerConfig.EnableAutoCommit)
{
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit(result);
}
}
}
catch (ConsumeException ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
}
}
});
}
/// <summary>
/// 批量订阅回调模式-消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topic">主题</param>
public void ConsumeBatch(Func<ConsumeResult<TKey, TValue>, bool> Func, List<string> Topics)
{
Task.Factory.StartNew(() =>
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
builder.SetErrorHandler((_, e) =>
{
Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
}).SetStatisticsHandler((_, json) =>
{
Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
}).SetPartitionsAssignedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
}).SetPartitionsRevokedHandler((c, partitions) =>
{
string partitionsStr = string.Join(", ", partitions);
Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
});
using var consumer = builder.Build();
consumer.Subscribe(Topics);
while (AppSetting.Kafka.IsConsumerSubscribe) //true
{
ConsumeResult<TKey, TValue> result = null;
try
{
result = consumer.Consume();
if (result.IsPartitionEOF) continue;
if (Func(result))
{
if (!(bool)ConsumerConfig.EnableAutoCommit)
{
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit(result);
}
}
}
catch (ConsumeException ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topics.ToArray()},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
}
}
});
}
/// <summary>
/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>
/// <returns>待消费数据</returns>
public List<ConsumeResult<TKey, TValue>> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100)
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
using var consumer = builder.Build();
consumer.Subscribe(Topic);
List<ConsumeResult<TKey, TValue>> Res = new List<ConsumeResult<TKey, TValue>>();
while (true)
{
try
{
var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
if (result == null) break;
else
{
Res.Add(result);
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit();
}
if (Res.Count > MaxRow) break;
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
return null;
}
}
return Res;
}
/// <summary>
/// 单笔消费模式-单行消费
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <returns>待消费数据</returns>
public ConsumeResult<TKey, TValue> ConsumeOneRow(string Topic, int TimeOut = 300)
{
var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
//设置反序列化方式
builder.SetValueDeserializer(new KafkaDConverter<TValue>());
using var consumer = builder.Build();
consumer.Subscribe(Topic);
try
{
var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
if (result != null)
{
//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit();
}
return result;
}
catch (Exception ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
return null;
}
}
public void Dispose()
{
//if (_cache != null)
// _cache.Dispose();
GC.SuppressFinalize(this);
}
}
2、KafkaProducer
/// <summary>
/// 生产者 控制器或Service里面构造函数注入即可调用
/// Message.Key的数据类型为string、Message.Value的数据类型为string
/// </summary>
/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
public class KafkaProducer<TKey, TValue> : KafkaConfig, IKafkaProducer<TKey, TValue>
{
/// <summary>
/// 构造生产者
/// </summary>
public KafkaProducer()
{
}
/// <summary>
/// Kafka地址(包含端口号)
/// </summary>
public string Servers
{
get
{
return ProducerConfig.BootstrapServers;
}
set
{
ProducerConfig.BootstrapServers = value;
}
}
/// <summary>
/// 生产
/// </summary>
/// <param name="Key">Message.Key 做消息指定分区投放有用的</param>
/// <param name="Value">Message.Value</param>
/// <param name="Topic">主题</param>
public void Produce(TKey Key, TValue Value, string Topic)
{
var producerBuilder = new ProducerBuilder<TKey, TValue>(ProducerConfig);
producerBuilder.SetValueSerializer(new KafkaConverter<TValue>());//设置序列化方式
using var producer = producerBuilder.Build();
try
{
producer.Produce(Topic, new Message<TKey, TValue>
{
Key = Key,
Value = Value
}, (result) =>
{
if (result.Error.IsError)
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()}", null, $"Delivery Error:{result.Error.Reason}");
});//Value = JsonConvert.SerializeObject(value)
}
catch (ProduceException<Null, string> ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
}
/// <summary>
/// 生产异步
/// </summary>
/// <param name="Key">Message.Key</param>
/// <param name="Value">Message.Value</param>
/// <param name="Topic">主题</param>
/// <returns></returns>
public async Task ProduceAsync(TKey Key, TValue Value, string Topic)
{
var producerBuilder = new ProducerBuilder<TKey, TValue>(ProducerConfig);
producerBuilder.SetValueSerializer(new KafkaConverter<TValue>());
using var producer = producerBuilder.Build();
try
{
var dr = await producer.ProduceAsync(Topic, new Message<TKey, TValue>
{
Key = Key,
Value = Value
});
//Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> ex)
{
Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()},Delivery failed: {ex.Error.Reason}", null, ex.Message + ex.StackTrace);
}
}
}
/// <summary>
/// 配置类
/// </summary>
public class KafkaConfig
{
/// <summary>
/// 构造配置类
/// </summary>
protected KafkaConfig()
{
ProducerConfig = new ProducerConfig()
{
BootstrapServers = AppSetting.Kafka.ProducerSettings.BootstrapServers,// "192.168.20.241:9092",
};
ConsumerConfig = new ConsumerConfig()
{
BootstrapServers = AppSetting.Kafka.ConsumerSettings.BootstrapServers,
GroupId = AppSetting.Kafka.ConsumerSettings.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
EnableAutoCommit = false,
//Kafka配置安全认证
//SecurityProtocol = SecurityProtocol.SaslPlaintext,
//SaslMechanism = SaslMechanism.Plain,
//SaslUsername = AppSetting.Kafka.ConsumerSettings.SaslUsername,
//SaslPassword = AppSetting.Kafka.ConsumerSettings.SaslPassword,
};
}
/// <summary>
/// 消费者配置文件
/// </summary>
public ConsumerConfig ConsumerConfig;
/// <summary>
/// 生产者配置文件
/// </summary>
public ProducerConfig ProducerConfig;
}
namespace KafkaManager
{
/// <summary>
/// 辅助类
/// </summary>
public class KafkaHelper
{
/// <summary>
/// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用)
/// </summary>
/// <returns></returns>
public static string GetApplicationName()
{
try
{
return Assembly.GetEntryAssembly().GetName().Name;
}
catch
{
return "Kafka_Test";
}
}
/// <summary>
/// 获取服务器名称
/// </summary>
/// <returns></returns>
public static string GetServerName()
{
return Dns.GetHostName();
}
/// <summary>
/// 获取服务器IP
/// </summary>
/// <returns></returns>
public static string GetServerIp()
{
IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName());
foreach (var ip in ips.AddressList)
{
if (Regex.IsMatch(ip.ToString(), @"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$"))
{
return ip.ToString();
};
}
return "127.0.0.1";
}
/// <summary>
/// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级)
/// </summary>
/// <returns>long</returns>
public static long GetTimeStamp()
{
DateTime time = DateTime.Now;
long t = (time.Ticks - 621356256000000000) / 10000;
return t;
}
}
#region 实现消息序列化和反序列化
public class KafkaConverter<T> : ISerializer<T>
{
/// <summary>
/// 序列化数据成字节
/// </summary>
/// <param name="data"></param>
/// <param name="context"></param>
/// <returns></returns>
public byte[] Serialize(T data, SerializationContext context)
{
var json = JsonConvert.SerializeObject(data);
return Encoding.UTF8.GetBytes(json);
}
}
public class KafkaDConverter<T> : IDeserializer<T>
{
/// <summary>
/// 反序列化字节数据成实体数据
/// </summary>
/// <param name="data"></param>
/// <param name="isNull"></param>
/// <param name="context"></param>
/// <returns></returns>
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return default(T);
var json = Encoding.UTF8.GetString(data.ToArray());
try
{
return JsonConvert.DeserializeObject<T>(json);
}
catch
{
return default(T);
}
}
}
#endregion
#region 日志类
/// <summary>
/// 默认日志类 可自行构造使用
/// </summary>
public class KafkaLogModel
{
/// <summary>
/// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion)
/// </summary>
public KafkaLogModel()
{
ServerIp = KafkaHelper.GetServerIp();
ServerName = KafkaHelper.GetServerName();
TimeStamp = DateTime.Now;
ApplicationName = KafkaHelper.GetApplicationName();
ApplicationVersion = "V1.0.0";
}
/// <summary>
/// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka)
/// </summary>
public string ApplicationName { get; set; }
/// <summary>
/// 程式版本(默认为V1.0.0)
/// </summary>
public string ApplicationVersion { get; set; }
/// <summary>
/// 发生时间(默认为当前时间)
/// </summary>
public DateTime TimeStamp { get; set; }
/// <summary>
/// 开始时间
/// </summary>
public DateTime BeginDate { get; set; }
/// <summary>
/// 结束时间
/// </summary>
public DateTime EndDate { get; set; }
/// <summary>
/// 服务器IP(默认抓取当前服务器IP)
/// </summary>
public string ServerIp { get; set; }
/// <summary>
/// 服务器名称(默认抓取当前服务器名称)
/// </summary>
public string ServerName { get; set; }
/// <summary>
/// 客户端IP
/// </summary>
public string ClientIp { get; set; }
/// <summary>
/// 模块(页面路径)
/// </summary>
public string Module { get; set; }
/// <summary>
/// 操作人
/// </summary>
public string Operator { get; set; }
/// <summary>
/// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义
/// </summary>
public string OperationType { get; set; }
/// <summary>
/// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义
/// </summary>
public string Status { get; set; }
/// <summary>
/// 其他信息
/// </summary>
public string Message { get; set; }
}
#endregion
}
#region kafka使用
if (AppSetting.Kafka.UseConsumer)
{
using var scope = host.Services.CreateScope();
var testConsumer = scope.ServiceProvider.GetService<IKafkaConsumer<string, string>>();
testConsumer.Consume(res =>
{
Console.WriteLine($"recieve:{DateTime.Now.ToLongTimeString()} value:{res.Message.Value}");
bool bl = DataHandle.AlarmData(res.Message.Value);
return bl;
}, AppSetting.Kafka.Topics.TestTopic);
}
#endregion