前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

开源一款功能强大的 .NET 消息队列通讯模型框架 Maomi.MQ

作者头像
痴者工良
发布2024-06-14 08:12:46
3130
发布2024-06-14 08:12:46
举报
文章被收录于专栏:痴者工良

文档说明

作者:痴者工良

文档地址:https://mmq.whuanle.cn

仓库地址:https://github.com/whuanle/Maomi.MQ

作者博客:

导读

Maomi.MQ 是一个消息通讯模型项目,目前只支持了 RabbitMQ。

Maomi.MQ.RabbitMQ 是一个用于专为 RabbitMQ 设计的发布者和消费者通讯模型,大大简化了发布和消息的代码,并提供一系列简便和实用的功能,开发者可以通过框架提供的消费模型实现高性能消费、事件编排,框架还支持发布者确认机制、自定义重试机制、补偿机制、死信队列、延迟队列、连接通道复用等一系列的便利功能。开发者可以把更多的精力放到业务逻辑中,通过 Maomi.MQ.RabbitMQ 框架简化跨进程消息通讯模式,使得跨进程消息传递更加简单和可靠。

此外,框架通过 runtime 内置的 api 支持了分布式可观测性,可以通过进一步使用 OpenTelemetry 等框架进一步收集可观测性信息,推送到基础设施平台中。

快速开始

本文将快速介绍 Maomi.MQ.RabbitMQ 的使用方法。

引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:

代码语言:javascript
复制
builder.Services.AddSwaggerGen();
builder.Services.AddLogging();

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
		options.HostName = "192.168.3.248";
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);

var app = builder.Build();
  • WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
  • AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
  • Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包。

代码语言:javascript
复制
var host = new HostBuilder()
	.ConfigureLogging(options =>
	{
		options.AddConsole();
		options.AddDebug();
	})
	.ConfigureServices(services =>
	{
		services.AddMaomiMQ(options =>
		{
			options.WorkId = 1;
			options.AppName = "myapp";
			options.Rabbit = (ConnectionFactory options) =>
			{
				options.HostName = "192.168.3.248";
				options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
			};
		}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
		
		// Your services.
		services.AddHostedService<MyPublishAsync>();
	}).Build();

await host.RunAsync();

定义消息模型类,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。

代码语言:javascript
复制
public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

定义消费者,消费者需要实现 IConsumer<TEvent> 接口,以及使用 [Consumer] 特性注解配置消费者属性。

代码语言:javascript
复制
[Consumer("test", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
    private static int _retryCount = 0;

    // 消费
    public async Task ExecuteAsync(EventBody<TestEvent> message)
    {
        Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // 每次消费失败时执行
    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
    
    // 补偿
    public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}

然后注入 IMessagePublisher 服务发布消息:

代码语言:javascript
复制
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public IndexController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpGet("publish")]
    public async Task<string> Publisher()
    {
        // 发布消息
        await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
		{
        	Id = i
        });
        return "ok";
    }
}

消息发布者

消息发布者用于推送消息到 RabbitMQ 服务器中。

通过注入 IMessagePublisher 接口即可向 RabbitMQ 推送消息,示例项目请参考 PublisherWeb

定义一个事件模型类:

代码语言:javascript
复制
public class TestEvent
{
	public int Id { get; set; }

	public override string ToString()
	{
		return Id.ToString();
	}
}

注入 IMessagePublisher 服务后发布消息:

代码语言:javascript
复制
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
	private readonly IMessagePublisher _messagePublisher;

	public IndexController(IMessagePublisher messagePublisher)
	{
		_messagePublisher = messagePublisher;
	}

	[HttpGet("publish")]
	public async Task<string> Publisher()
	{
		for (var i = 0; i < 100; i++)
		{
			await _messagePublisher.PublishAsync(queue: "PublisherWeb", message: new TestEvent
			{
				Id = i
			});
		}

		return "ok";
	}
}

IMessagePublisher

IMessagePublisher 定义比较简单,只有三个方法和一个属性:

代码语言:javascript
复制
public ConnectionPool ConnectionPool { get; }

Task PublishAsync<TEvent>(string queue, TEvent message, Action<IBasicProperties>? properties = null)
where TEvent : class;

Task PublishAsync<TEvent>(string queue, TEvent message, IBasicProperties properties);

//  不建议直接使用该接口。
Task CustomPublishAsync<TEvent>(string queue, EventBody<TEvent> message, BasicProperties properties);

三个 PublishAsync 方法用于发布事件,ConnectionPool 属性用于获取 RabbitMQ.Client.IConnection 对象。

由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以 Maomi.MQ.RabbitMQ 没必要过度设计,只提供了简单的功能接口。

例如,可以通过 BasicProperties 配置单条消息的过期时间:

代码语言:javascript
复制
await _messagePublisher.PublishAsync(queue: "RetryWeb", message: new TestEvent
{
	Id = i
}, (BasicProperties p) =>
{
	p.Expiration = "1000";
});

当发布一条消息时,实际上框架传递的是 EventBody<T> 类型,EventBody<T> 中包含了一些重要的附加消息属性,这些属性会给消息处理和故障诊断带来很大的方便。

代码语言:javascript
复制
public class EventBody<TEvent>
{
	// 事件唯一 id.
	public long Id { get; init; }

	// Queue.
	public string Queue { get; init; } = null!;

	// App name.
	public string Publisher { get; init; } = null!;

	// 事件创建时间.
	public DateTimeOffset CreationTime { get; init; }

	// 事件体.
	public TEvent Body { get; init; } = default!;
}

Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Singleton:

代码语言:javascript
复制
services.AddSingleton<IMessagePublisher, DefaultMessagePublisher>();

生命周期不重要,如果需要修改默认的生命周期,可以手动修改替换。

代码语言:javascript
复制
services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();

开发者也可以自行实现 IMessagePublisher 接口,具体示例请参考 DefaultMessagePublisher 类型。

连接池

为了复用 RabbitMQ.Client.IConnection ,Maomi.MQ.RabbitMQ 内部实现了 ConnectionPool 类型,通过对象池维护复用的 RabbitMQ.Client.IConnection 对象。

默认对象池中的 RabbitMQ.Client.IConnection 数量为 0,只有当连接被真正使用时才会从对象池委托中创建,连接对象会随着程序并发量而自动增加,但是,默认最大连接对象数量为 Environment.ProcessorCount * 2

除了 IMessagePublisher 接口提供的 PublishAsync 方法可以发布事件,开发者还可以从 ConnectionPool 获取连接对象,请务必在使用完毕后通过 ConnectionPool.Return() 方法将其归还到连接对象池。

通过连接池直接使用 IConnection 对象发布消息:

代码语言:javascript
复制
[HttpGet("publish")]
public async Task<string> Publisher()
{
	for (var i = 0; i < 100; i++)
	{
		var connectionPool = _messagePublisher.ConnectionPool;
		var connection = connectionPool.Get();

		try
		{
			connection.Channel.BasicPublishAsync(
			exchange: string.Empty,
			routingKey: "queue",
			basicProperties: properties,
			body: _jsonSerializer.Serializer(message),
			mandatory: true);
		}
		finally
		{
			connectionPool.Return(connection);
		}
	}

	return "ok";
}

你也可以绕开 IMessagePublisher ,直接注入 ConnectionPool 使用 RabbitMQ 连接对象,但是不建议这样使用。

代码语言:javascript
复制
private readonly ConnectionPool _connectionPool;

public DefaultMessagePublisher(ConnectionPool connectionPool)
{
	_connectionPool = connectionPool;
}

public async Task MyPublshAsync()
{
	var connection = _connectionPool.Get();
	try
	{
		await connection.Channel.BasicPublishAsync(...);
	}
	finally
	{
		_connectionPool.Return(connection);
	}
}

为了更加简便地管理连接对象,可以使用 CreateAutoReturn() 函数创建连接管理对象,该对象被释放时会自动将 IConnection 返还给连接池。

代码语言:javascript
复制
using var poolObject = _messagePublisher.ConnectionPool.CreateAutoReturn();
poolObject.Channel.BasicPublishAsync(
	exchange: string.Empty,
	routingKey: "queue",
	basicProperties: properties,
	body: _jsonSerializer.Serializer(message),
	mandatory: true);

如果你自行使用 ConnectionPool 推送消息到 RabbitMQ,请务必通过序列化 EventBody<TEvent> 事件对象,这样 Maomi.MQ.RabbitMQ 消费者才能正常工作。同时,Moami.MQ 对可观测性做了支持,如果自行使用 ConnectionPool 获取连接对象推送消息,可能会导致可观测性信息缺失。

正常情况下,RabbitMQ.Client 中包含了可观测性的功能,但是 Maomi.MQ.RabbitMQ 附加的可观测性信息有助于诊断故障问题。

请注意:

  • Maomi.MQ.RqbbitMQ 通过 EventBody<TEvent> 泛型对象发布和接收事件。
  • DefaultMessagePublisher 包含了链路追踪等可观测性代码。

消息过期

IMessagePublisher 对外开放了 BasicProperties 或 BasicProperties,可以自由配置消息属性。

例如为消息配置过期时间:

代码语言:javascript
复制
[HttpGet("publish")]
public async Task<string> Publisher()
{
	for (var i = 0; i < 1; i++)
	{
		await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
		{
			Id = i
		}, properties =>
		{
			properties.Expiration = "6000";
		});
	}

	return "ok";
}

如果此时为 test 绑定死信队列,那么该消息长时间没有被消费时,会被移动到另一个队列,请参考 死信队列。

还可以通过配置消息属性实现更多的功能,请参考 IBasicProperties

事务

RabbitMQ 支持事务,不过据 RabbitMQ 官方文档显示,事务会使吞吐量减少 250 倍。

RabbitMQ 事务使用上比较简单,可以保证发布的消息已经被推送到 RabbitMQ 服务器,只有当提交事务时,提交的消息才会被 RabbitMQ 存储并推送给消费者。

使用示例:

代码语言:javascript
复制
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
	using var tranPublisher = await _messagePublisher.TxSelectAsync();

	try
	{
		await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
		{
			Id = 666
		});
		await tranPublisher.TxCommitAsync();
	}
	catch
	{
		await tranPublisher.TxRollbackAsync();
		throw;
	}

	return "ok";
}

或者手动开启事务:

代码语言:javascript
复制
[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
	using var tranPublisher = _messagePublisher.CreateTransaction();

	try
	{
		await tranPublisher.TxSelectAsync();
		await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
		{
			Id = 666
		});
		await tranPublisher.TxCommitAsync();
	}
	catch
	{
		await tranPublisher.TxRollbackAsync();
		throw;
	}

	return "ok";
}

注意,在该种模式之下,创建 TransactionPublisher 对象时,会从对象池中取出一个连接对象,因为开启事务模式可能会污染当前连接通道,因此 TransactionPublisher 不会向连接池归还连接对象,而是直接释放

发送方确认模式

虽然事务模式可以保证消息会被推送到 RabbitMQ 服务器中,但是由于事务模式会导致吞吐量降低 250 倍,因此不是一个好的选择。为了解决这个问题, RabbitMQ 引入了一种确认机制,这种机制就像滑动窗口,能够保证消息推送到服务器中,并且具备高性能的特性。

请参考 https://www.rabbitmq.com/docs/confirms

使用示例:

代码语言:javascript
复制
[HttpGet("publish_confirm")]
public async Task<string> Publisher_Confirm()
{
	using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();

	for (var i = 0; i < 5; i++)
	{
		await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
		{
			Id = 666
		});

		var result = await confirmPublisher.WaitForConfirmsAsync();

		// 如果在超时内没有接收到 nacks,则为 True,否则为 false。
		Console.WriteLine($"发布 {i},{result}");
	}

	return "ok";
}

WaitForConfirmsAsync 方法会返回一个值,如果正常被服务器确认了消息已经传达,则结果为 true,如果超时没有被服务器确认,则返回 false。

此外,还有一个 WaitForConfirmsOrDieAsync 方法,它会一直等待该频道上的所有已发布消息都得到确认,使用示例:

代码语言:javascript
复制
using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();

for (var i = 0; i < 5; i++)
{
	await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
	{
		Id = 666
	});

	Console.WriteLine($"发布 {i}");
}

await confirmPublisher.WaitForConfirmsOrDieAsync();

注意,在该种模式之下,创建 ConfirmPublisher 对象时,会从对象池中取出一个连接对象,因为开启事务模式可能会污染当前连接通道,因此 ConfirmPublisher 不会向连接池归还连接对象,而是直接释放

注意,同一个通道不能同时使用事务和发送方确认模式。

独占模式

默认情况下,每次使用 IMessagePublisher.PublishAsync() 发布消息时,都会从连接池中取出连接对象,然后使用该连接通道发布消息,发布完毕后就会归还连接对象给连接池。

如果需要在短时间内大批量发布消息,则需要每次都要重复获取和返还连接对象。

使用独占模式时可以在一段时间内独占一个连接对象,超出作用域后,连接对象会自动放回连接池。这种模式对于需要大量发布消息的场景提高吞吐量非常有帮助。为了能够将连接通道归还连接池,请务必使用 using 关键字修饰变量,或者手动调用 Dispose 函数。

使用示例:

代码语言:javascript
复制
// 创建独占模式
using var singlePublisher = _messagePublisher.CreateSingle();

for (var i = 0; i < 500; i++)
{
	await singlePublisher.PublishAsync(queue: "publish_single", message: new TestEvent
	{
		Id = 666
	});
}

消费者

Maomi.MQ.RabbitMQ 中,有两种消费模式,一种是消费者模式,一种是事件模式(事件总线模式)。

下面简单了解这两种模式的使用方法。

消费者模式

消费者服务需要实现 IConsumer<TEvent> 接口,并且配置 [Consumer("queue")] 特性绑定队列名称,通过消费者对象来控制消费行为。

消费者模式有具有失败通知和补偿能力,使用上也比较简单。

代码语言:javascript
复制
public class TestEvent
{
    public int Id { get; set; }
}

[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
    private static int _retryCount = 0;

    // 消费或重试
    public async Task ExecuteAsync(EventBody<TestEvent> message)
    {
        _retryCount++;
        Console.WriteLine($"执行次数:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }
    
    // 失败
    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
    
    // 补偿
    public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}

事件模式

事件模式是通过事件总线的方式实现的,以事件模型为中心,通过事件来控制消费行为。

代码语言:javascript
复制
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
	public string Message { get; set; }
}

然后使用 [EventOrder] 特性编排事件执行顺序。

代码语言:javascript
复制
// 编排事件消费顺序
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
	}

	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
		Console.WriteLine($"{@event.Id},事件 1 已被执行");
	}
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
	}

	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
		Console.WriteLine($"{@event.Id},事件 2 已被执行");
	}
}

当然,事件模式也可以通过创建中间件增加补偿功能,通过中间件还可以将所有排序事件放到同一个事务中,一起成功或失败,避免事件执行时出现程序退出导致的一致性问题。

代码语言:javascript
复制
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public TestEventMiddleware(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
    {
        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }

    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
    {
        return Task.CompletedTask;
    }

    public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
    {
        return Task.FromResult(true);
    }
}

分组

消费者模式和事件模式都可以设置分组,在特性上设置了 Group 属性,具有同一个分组的事件会被放到一个连接通道(RabbitMQ.Client.IConnection)中,对于消费频率不高的事件,复用连接通道可以有效较低资源消耗。

消费者模式分组示例:

代码语言:javascript
复制
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
    public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;

    public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;

    public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}

[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
    public Task ExecuteAsync(EventBody<GroupEvent> message) => Task.CompletedTask;

    public Task FaildAsync(Exception ex, int retryCount, EventBody<GroupEvent>? message) => Task.CompletedTask;

    public Task<bool> FallbackAsync(EventBody<GroupEvent>? message) => Task.FromResult(true);
}

事件总线模式分组示例:

代码语言:javascript
复制
[EventTopic("web1", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test1Event
{
	public string Message { get; set; }
}
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true, Group = "group")]
public class Test2Event
{
	public string Message { get; set; }
}

消费者模式

消费者模式要求服务实现 IConsumer<TEvent> 接口,并添加 [Connsumer] 特性。

IConsumer<TEvent> 接口比较简单,其定义如下:

代码语言:javascript
复制
public interface IConsumer<TEvent>
    where TEvent : class
{
    // 消息处理.
    public Task ExecuteAsync(EventBody<TEvent> message);

    // ExecuteAsync 异常后立即执行此代码.
    public Task FaildAsync(Exception ex, int retryCount, EventBody<TEvent>? message);

    // 最后一次重试失败时执行,用于补偿.
    public Task<bool> FallbackAsync(EventBody<TEvent>? message);
}

使用消费者模式时,需要先定义一个模型类,用于发布者和消费者之间传递消息,事件模型类只要是类即可,能够正常序列化和反序列化,没有其它要求。

代码语言:javascript
复制
public class TestEvent
{
	public int Id { get; set; }

	public override string ToString()
	{
		return Id.ToString();
	}
}

然后继承 IConsumer<TEvent> 接口实现消费者功能:

代码语言:javascript
复制
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
	// 消费
	public async Task ExecuteAsync(EventBody<TestEvent> message)
	{
		Console.WriteLine(message.Body.Id);
	}

	// 每次失败时被执行
	public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
	{
		Console.WriteLine($"重试 {message.Body.Id},次数 {retryCount}");
		await Task.CompletedTask;
	}

	// 最后一次失败时执行
	public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
	{
		Console.WriteLine($"最后一次 {message.Body.Id}");
        // 如果返回 true,说明补偿成功。
		return true;
	}
}

特性配置的说明请参考 消费者配置 。

消费、重试和补偿

消费者收到服务器推送的消息时,ExecuteAsync 方法会被自动执行。当 ExecuteAsync 执行异常时,FaildAsync 方法会马上被触发,开发者可以利用 FaildAsync 记录相关日志信息。

代码语言:javascript
复制
// 每次失败时被执行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
	// 当 retryCount == -1 时,错误并非是 ExecuteAsync 方法导致的
	if (retryCount == -1)
	{
		_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);

		// 可以在此处添加告警通知代码
		await Task.Delay(1000);
	}
	else
	{
		_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
	}
}

如果 FaildAsync 方法也出现异常时,不会影响整体流程,框架会等待到达间隔时间后继续重试 ExecuteAsync 方法。

建议 FaildAsync 使用 try{}cathc{} 套住代码,不要对外抛出异常,FaildAsync 的逻辑不要包含太多逻辑,并且 FaildAsync 只应记录日志或进行告警使用。

FaildAsync 被执行有一个额外情况,就是在消费消息之前就已经发生错误,例如一个事件模型类有构造函数导致不能被反序列化,这个时候 FaildAsync 会被立即执行,且 retryCount = -1

ExecuteAsync 方法执行异常时,框架会自动重试,默认会重试五次,如果五次都失败,则会执行 FallbackAsync 方法进行补偿。

重试间隔时间会逐渐增大,请参考 重试。

当重试五次之后,就会立即启动补偿机制。

代码语言:javascript
复制
// 最后一次失败时执行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
	return true;
}

FallbackAsync 方法需要返回 bool,如果返回 true ,表示虽然 ExecuteAsync 出现异常,但是 FallbackAsync 补偿后已经正常,该消息会被正常消费掉。如果返回 false,则说补偿失败,该消息按照消费失败处理。

只有 ExecuteAsync 异常时,才会触发 FaildAsyncFallbackAsync ,如果是在处理消息之前的异常,会直接失败。

retry
retry

消费失败

ExecuteAsync 失败次数达到阈值时,并且 FallbackAsync 返回 false,则该条消息消费失败,或者由于序列化等错误时直接失败。

[Consumer] 特性中有三个很重要的配置:

代码语言:javascript
复制
public class ConsumerAttribute : Attribute
{
    // 消费失败次数达到条件时,是否放回队列.
    public bool RetryFaildRequeue { get; set; }

    // 现异常时是否放回队列,例如序列化错误等原因导致的,而不是消费时发生异常导致的.
    public bool ExecptionRequeue { get; set; }  = true;
    
    // 绑定死信队列.
    public string? DeadQueue { get; set; }
}

ExecuteAsync 失败次数达到阈值时,并且 FallbackAsync 返回 false,则该条消息消费失败。

如果 RetryFaildRequeue == false,那么该条消息会被 RabbitMQ 丢弃。

如果绑定了死信队列,则会先推送到死信队列,接着再丢弃。

如果 RetryFaildRequeue == true,那么该条消息会被返回 RabbbitMQ 队列中,等待下一次消费。

由于消息失败后会被放回队列,因此绑定的死信队列不会收到该消息。

当序列化异常或者其它问题导致错误而不能进入 ExecuteAsync 方法时,FaildAsync 方法会首先被触发一次,此时 retryCount 参数值为 -1

出现此种问题时,一般是开发者 bug 导致的,不会进行补偿等操作,开发者可以在 FaildAsync 中处理该事件,记录相关日志信息。

代码语言:javascript
复制
// 每次失败时被执行,或者出现无法进入 ExecuteAsync 的异常
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
	// 当 retryCount == -1 时,错误并非是 ExecuteAsync 方法导致的
	if (retryCount == -1)
	{
		_logger.LogError(ex, "Consumer error,event id: {Id}", message?.Id);

		// 可以在此处添加告警通知代码
		await Task.Delay(1000);
	}
	else
	{
		_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
	}
}

由于这种情况不妥善处理,会导致消息丢失,因此框架默认将 ExecptionRequeue 设置为 true,也就是说出现这种异常时,消息会被放回队列。如果问题一致没有得到解决,则会出现循环:调用 FaildAsync 、放回队列、调用 FaildAsync 、放回队列... ...

所以应该在 FaildAsync 中添加代码通知开发者相关信息,并且设置间隔时间,避免重试太频繁。

自动创建队列

框架默认会自动创建队列,如果需要关闭自动创建功能,把 AutoQueueDeclare 设置为 false 即可。

代码语言:javascript
复制
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AppName = "myapp";
	options.AutoQueueDeclare = false;
	options.Rabbit = (ConnectionFactory options) =>
	{
		options.HostName = "192.168.3.248";
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);

当然还可以单独为消费者配置是否自动创建队列:

代码语言:javascript
复制
[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]

默认情况下,关闭了全局自动创建,则不会自动创建队列。

如果关闭全局自动创建,但是消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable,则还是会自动创建队列。

如果消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable ,则会忽略全局配置,不会创建队列。

Qos

让程序需要严格根据顺序消费时,可以使用 Qos = 1,框架会严格保证逐条消费,如果程序不需要顺序消费,希望可以快速处理所有消息,则可以将 Qos 设置大一些。由于 Qos 和重试、补偿机制组合使用会有多种情况,因此请参考 重试。

Qos 是通过特性来配置的:

代码语言:javascript
复制
[Consumer("ConsumerWeb", Qos = 1)]

可以通过调高 Qos 值,让程序在可以并发消息,提高并发量。

延迟队列

延迟队列有两种,一种设置消息过期时间,一种是设置队列过期时间。

设置消息过期时间,那么该消息在一定时间没有被消费时,会被丢弃或移动到死信队列中,该配置只对单个消息有效,请参考 消息过期。

队列设置过期后,当消息在一定时间内没有被消费时,会被丢弃或移动到死信队列中,该配置只对所有消息有效。基于这一点,我们可以实现延迟队列。

首先创建消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。

代码语言:javascript
复制
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}

// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
    // 消费
    public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
    {
        Console.WriteLine($"死信队列,事件 id:{message.Id}");
        return Task.CompletedTask;
    }

    // 每次失败时被执行
    public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;

    // 最后一次失败时执行
    public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}

空消费者

当识别到空消费者时,框架只会创建队列,而不会启动消费者消费消息。

可以结合延迟队列一起使用,该队列不会有任何消费者,当该队列的消息过期时,都由死信队列直接消费,示例如下:

代码语言:javascript
复制
[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }

[Consumer("ConsumerWeb_empty_dead", Qos = 10)]
public class MyDeadConsumer : IConsumer<TestEvent>
{
	public Task ExecuteAsync(EventBody<TestEvent> message) => Task.CompletedTask;

	public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;

	public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}

对于跨进程的队列,A 服务不消费只发布,B 服务负责消费,A 服务中可以加一个空消费者,保证 A 服务启动时该队列一定存在,另一方面,消费者服务不应该关注队列的定义,也不太应该创建队列。

分组

通过配置 Group 属性将多个消费者放到同一个连接通道中执行,对于那些并发量不高的队列,复用连接通道可以降低资源消耗。

示例:

代码语言:javascript
复制
[Consumer("ConsumerWeb_group_1", Qos = 1, Group = "group")]
public class Group_1_Consumer : IConsumer<GroupEvent>
{
}

[Consumer("ConsumerWeb_group_2", Qos = 1, Group = "group")]
public class Group_2_Consumer : IConsumer<GroupEvent>
{
}

事件总线模式

Maomi.MQ 内部设计了一个事件总线,可以帮助开发者实现事件编排、实现本地事务、正向执行和补偿。

首先定义一个事件类型,该事件绑定一个 topic 或队列,事件需要使用 [EventTopic] 标识,并设置该事件对于的队列名称。

[EventTopic] 特性拥有与 [Consumer] 相同的特性,可参考 [Consumer] 的使用配置事件,请参考 消费者配置。

代码语言:javascript
复制
[EventTopic("EventWeb")]
public class TestEvent
{
	public string Message { get; set; }

	public override string ToString()
	{
		return Message;
	}
}

然后编排事件执行器,每个执行器都需要继承 IEventHandler<T> 接口,然后使用 [EventOrder] 特性标记执行顺序。

代码语言:javascript
复制
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
	}

	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
		Console.WriteLine($"{@event.Id},事件 1 已被执行");
	}
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
	public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
	}

	public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
	{
		Console.WriteLine($"{@event.Id},事件 2 已被执行");
	}
}

每个事件执行器都必须实现 IEventHandler<T> 接口,并且设置 [EventOrder] 特性以便确认事件的执行顺序,框架会按顺序执行 IEventHandler<T>ExecuteAsync 方法,当 ExecuteAsync 出现异常时,则反向按顺序调用 CancelAsync

由于程序可能随时挂掉,因此通过 CancelAsync 实现补偿是不太可能的,CancelAsync 主要作为记录相关信息而使用。

中间件

中间件的作用是便于开发者拦截事件、记录信息、实现本地事务等,如果开发者不配置,则框架会自动创建 DefaultEventMiddleware<TEvent> 类型作为该事件的中间件服务。

自定义事件中间件示例代码:

代码语言:javascript
复制
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
	public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
	{
		await next(@event, CancellationToken.None);
	}
}

next 委托是框架构建的事件执行链路,在中间件中可以拦截事件、决定是否执行事件链路。

在中间件中调用 next() 委托时,框架开始按顺序执行事件,即前面提到的 My1EventEventHandlerMy2EventEventHandler

当一个事件有多个执行器时,由于程序可能会在任何时刻挂掉,因此本地事务必不可少。

例如,在中间件中注入数据库上下文,然后启动事务执行数据库操作,当其中一个 EventHandler 执行失败时,执行链路会回滚,同时不会提交事务。

可以参考 消费者模式 实现中间件的重试和补偿方法。

示例如下:

代码语言:javascript
复制
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public TestEventMiddleware(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
    {
        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }

    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
    {
        return Task.CompletedTask;
    }

    public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
    {
        return Task.FromResult(true);
    }
}

[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public My1EventEventHandler(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id} 被补偿,[1]");
    }

    public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        await _bloggingContext.Posts.AddAsync(new Post
        {
            Title = "鲁滨逊漂流记",
            Content = "随便写写就对了"
        });
        await _bloggingContext.SaveChangesAsync();
    }
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public My2EventEventHandler(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }
    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id} 被补偿,[2]");
    }

    public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        await _bloggingContext.Posts.AddAsync(new Post
        {
            Title = "红楼梦",
            Content = "贾宝玉初试云雨情"
        });
        await _bloggingContext.SaveChangesAsync();

        throw new OperationCanceledException("故意报错");
    }
}
image-20240525155639461
image-20240525155639461

事件执行时,如果出现异常,也是会被重试的,中间件 TestEventMiddleware 的 FaildAsync、FallbackAsync 会被依次执行。

你可以参考 消费者模式 或者 重试 。

分组消费

事件分组消费主要是利用同一个 IConnection 同时处理多个消息队列,提高通道利用率。

示例:

代码语言:javascript
复制
[EventTopic("EventGroup_1", Group = "aaa")]
public class Test1Event
{
	public string Message { get; set; }

	public override string ToString()
	{
		return Message;
	}
}

[EventTopic("EventGroup_2", Group = "aaa")]
public class Test2Event
{
	public string Message { get; set; }

	public override string ToString()
	{
		return Message;
	}
}

Maomi.MQ 的 IConsumer<T> 是一个消费者(一个队列)使用一个 IConnection,默认情况下事件总线也是。

对于哪些并发量不大或利用率较低的队列,可以通过事件分组将其合并到同一个 IConnection 中进行处理。

使用方法很简单,只需要在定义事件时,配置 [EventTopic] 特性的 Group 方法即可。

由于不同队列被放到一个 IConnection 中消费,如果事件都设置了 Qos,那么框架会默认计算平均值,例如:

代码语言:javascript
复制
[EventTopic("web3_1", Group = "aaa", Qos = 10)]
public class Test1Event

[EventTopic("web3_2", Group = "aaa", Qos = 6)]
public class Test2Event

此时框架会设置 Qos 为 8

配置

在引入 Maomi.MQ 框架时,可以配置相关属性,示例和说明如下:

代码语言:javascript
复制
// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // 当前程序节点,用于配置分布式雪花 id
	options.WorkId = 1;
    
    // 是否自动创建队列
	options.AutoQueueDeclare = true;
    
    // 当前应用名称,用于标识消息的发布者和消费者程序
	options.AppName = "myapp";
    
    // RabbitMQ 配置
	options.Rabbit = (ConnectionFactory options) =>
	{
		options.HostName = "192.168.3.248";
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);  // 要被扫描的程序集

消费者配置

消费者模式 [Consumer] 和事件总线模式 [EventTopic] 具有相同的属性配置,其配置说明如下:

名称

类型

默认值

说明

Queue

string

队列名称

DeadQueue

string?

绑定死信队列名称

ExecptionRequeue

bool

true

出现异常时是否放回队列,例如序列化错误等原因导致的,而不是消费时发生异常导致的

Expiration

int

0

队列消息过期时间,单位毫秒

Qos

ushort

1

Qos

RetryFaildRequeue

bool

false

消费失败次数达到条件时,是否放回队列

Group

string?

分组名称

AutoQueueDeclare

AutoQueueDeclare

AutoQueueDeclare.None

是否自动创建队列

环境隔离

目前还在考虑要不要支持多租户模式。

在开发中,往往需要在本地调试,本地程序启动后会连接到开发服务器上,一个队列收到消息时,会向其中一个消费者推送消息。那么我本地调试时,发布一个消息后,可能本地程序收不到该消息,而是被开发环境中的程序消费掉了。

这个时候,我们希望可以将本地调试环境跟开发环境隔离开来,可以使用 RabbitMQ 提供的 VirtualHost 功能。

首先通过 put 请求创建一个新的 VirtualHost,请参考文档:https://www.rabbitmq.com/docs/vhosts#using-http-api

image-20240612193415867
image-20240612193415867

然后在代码中配置 VirtualHost:

代码语言:javascript
复制
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
		options.HostName = "192.168.3.248";
#if DEBUG
		options.VirtualHost = "debug";
#endif
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);

雪花 id 配置

Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每个事件在集群中都有一个唯一 id。

框架通过 IIdFactory 接口创建雪花 id,你可以通过替换 IIdFactory 接口配置雪花 id 生成规则。

代码语言:javascript
复制
services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));

示例:

代码语言:javascript
复制
public class DefaultIdFactory : IIdFactory
{
    /// <summary>
    /// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
    /// </summary>
    /// <param name="workId"></param>
    public DefaultIdFactory(ushort workId)
    {
        var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
        YitIdHelper.SetIdGenerator(options);
    }

    /// <inheritdoc />
    public long NextId() => YitIdHelper.NextId();
}

IdGenerator 框架生成雪花 id 配置请参考:

https://github.com/yitter/IdGenerator/tree/master/C%23

Qos 并发和顺序

基于消费者模式和基于事件模式都是通过特性来配置消费属性,Qos 是其中一个重要的属性。

Qos 场景

对于消费者模式和事件总线模式,在没有使用 Group 属性配置消费行为时,每个队列都会独占一个 IConnection 以及 Host service。

对于消费频率很高但是不能并发的队列,最好不要设置 Group 属性,以及务必设置 Qos = 1。这样依赖,该消费者会独占资源进行消费,在保证顺序的情况下,独占资源有助于提高消费能力。

代码语言:javascript
复制
[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
}

当需要需要提高消费吞吐量,而且不需要顺序消费时,可以将 Qos 设置高一些,RabbitMQ Client 框架会通过预取等方式提高吞吐量,并且多条消息可以并发消费。

如果判断一些消费者的消费频率不会很高时,可以将这些消费者放到一个分组中。

当多个消费者或事件配置共用一个分组时,那么这些事件的 Qos 应当一致,否则按照平均值来算。

示例:

代码语言:javascript
复制
[Consumer("web1", Qos = 10, Group = "group")]
public class My1Consumer : IConsumer<TestEvent>
{
}

[Consumer("web2", Qos = 6, Group = "group")]
public class My2Consumer : IConsumer<TestEvent>
{
}

由于两个消费者使用相同的分组,因此复用通道的 Qos 会被设置为 8。

如果消费频率不高,但是需要顺序消费时,可以将这些消费者放到同一个分组中,并且 Qos 设置为 1。

代码语言:javascript
复制
[Consumer("web1", Qos = 1, Group = "group1")]
public class My1Consumer : IConsumer<TestEvent>
{
}

[Consumer("web2", Qos = 1, Group = "group1")]
public class My2Consumer : IConsumer<TestEvent>
{
}

并发和异常处理

第一次情况,Qos 为 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。

第二种情况,Qos 为 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。

Qos 为 1 时,会保证严格顺序消费,ExecptionRequeue 、RetryFaildRequeue 会影响失败的消息是否会被放回队列,如果放回队列,下一次消费会继续消费之前失败的消息。如果错误(如 bug)得不到解决,则会出现消费、失败、放回队列、重新消费这样的循环。

第三次情况,Qos > 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。

第四种情况,Qos > 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。

当 Qos 大于 1 时,如果设置了 RetryFaildRequeue = true,那么消费失败的消息会被放回队列中,但是不一定下一次会立即重新消费该条消息。

重试

重试时间

当消费者 ExecuteAsync 方法异常时,框架会进行重试,默认会重试五次,按照 2 作为指数设置重试时间间隔。

第一次失败后,间隔 2 秒重试,第二次失败后,间隔 4 秒,接着分别是 8、16、32 秒。

Maomi.MQ.RabbitMQ 使用了 Polly 框架做重试策略管理器,默认通过 DefaultRetryPolicyFactory 服务生成重试间隔策略。

DefaultRetryPolicyFactory 代码示例如下:

代码语言:javascript
复制
/// <summary>
/// Default retry policy.<br />
/// 默认的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
    /// <inheritdoc/>
    public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)
    {
        // Create a retry policy.
        // 创建重试策略.
        var retryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: 5,
                sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                onRetry: async (exception, timeSpan, retryCount, context) =>
                {
                    _logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
                    await FaildAsync(queue, exception, timeSpan, retryCount, context);
                });

        return Task.FromResult(retryPolicy);
    }

    
    public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
    {
        return Task.CompletedTask;
    }
}

你可以通过实现 IRetryPolicyFactory 接口,替换默认的重试策略服务服务。

代码语言:javascript
复制
services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();

重试机制

设定消费者代码如下:

代码语言:javascript
复制
    [Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]
    public class MyConsumer : IConsumer<TestEvent>
    {
        private  int _retryCount = 0;
        // 消费
        public async Task ExecuteAsync(EventBody<TestEvent> message)
        {
            Console.WriteLine($"执行 {message.Body.Id} 第几次:{_retryCount} {DateTime.Now}");
            _retryCount++;
            throw new Exception("1");
        }

        // 每次失败时被执行
        public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
        {
            Console.WriteLine($"重试 {message.Body.Id} 第几次:{retryCount} {DateTime.Now}");
            await Task.CompletedTask;
        }


        // 最后一次失败时执行
        public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
        {
            Console.WriteLine($"执行 {message.Body.Id} 补偿 {DateTime.Now}");
            return true;
        }
    }
}

首先会执行 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 消费消息,此时 ExecuteAsync() 执行失败,立即触发 FaildAsync() 函数。

然后等待一段时间间隔后,接着会重新执行 ExecuteAsync() 方法。

比如默认重试机制是重试五次,那么最终 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 都会被执行 6次,一次正常消费和五次重试消费。

FallbackAsync() 方法会在最后一次重试失败后被调用,该函数要返回一个 bool 类型。

当多次重试失败后,框架会调用 FallbackAsync 方法,如果该方法放回 true,那么框架会认为虽然 ExecuteAsync() 执行失败,但是通过 FallbackAsync() 已经补偿好了,该消息会被当做正常完成消费,框架会向 RabbitMQ 服务器发送 ACK,接着消费下一条消息。

如果 FallbackAsync() 返回 true,框架会认为该消息彻底失败,如果设置了 RetryFaildRequeue = true,那么该条消息会被放回消息队列,等待下一次消费。否则该条消息会被直接丢弃。

持久化剩余重试次数

当消费者处理消息失败时,默认消费者会重试 5 次,如果已经重试了 3 次,此时程序重启,那么下一次消费该消息时,依然是继续重试五次。

需要记忆重试次数,在程序重启时,能够按照剩余次数进行重试。

引入 Maomi.MQ.RedisRetry 包。

配置示例:

代码语言:javascript
复制
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = "myapp";
	options.Rabbit = (ConnectionFactory options) =>
	{
		options.HostName = "192.168.3.248";
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);

builder.Services.AddMaomiMQRedisRetry((s) =>
{
	ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
	IDatabase db = redis.GetDatabase();
	return db;
});

默认 key 只会保留 5 分钟。也就是说,如果五分钟之后程序才重新消费该消息,那么就会剩余重试次数就会重置。

死信队列

死信队列

可以给一个消费者或事件绑定死信队列,当该队列的消息失败后并且不会放回队列时,该消息会被推送到死信队列中,示例:

代码语言:javascript
复制
[Consumer("ConsumerWeb_dead", Qos = 1, DeadQueue = "ConsumerWeb_dead_queue", RetryFaildRequeue = false)]
public class DeadConsumer : IConsumer<DeadEvent>
{
	// 消费
	public Task ExecuteAsync(EventBody<DeadEvent> message)
	{
		Console.WriteLine($"事件 id:{message.Id}");
		throw new OperationCanceledException();
	}

	// 每次失败时被执行
	public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadEvent>? message) => Task.CompletedTask;

	// 最后一次失败时执行
	public Task<bool> FallbackAsync(EventBody<DeadEvent>? message) => Task.FromResult(false);
}

// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("ConsumerWeb_dead_queue", Qos = 1)]
public class DeadQueueConsumer : IConsumer<DeadQueueEvent>
{
	// 消费
	public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
	{
		Console.WriteLine($"死信队列,事件 id:{message.Id}");
		return Task.CompletedTask;
	}

	// 每次失败时被执行
	public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;

	// 最后一次失败时执行
	public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}

如果使用死信队列,则务必将 RetryFaildRequeue 设置为 false,那么消费者会在重试多次失败后,向 RabbitMQ 发送 nack 信号,RabbitMQ 就会将该消息转发到绑定的死信队列中。

延迟队列

创建一个消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。

代码语言:javascript
复制
[Consumer("ConsumerWeb_dead_2", Expiration = 6000, DeadQueue = "ConsumerWeb_dead_queue_2")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}

// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("ConsumerWeb_dead_queue_2", Qos = 1)]
public class Dead_2_QueueConsumer : IConsumer<DeadQueueEvent>
{
    // 消费
    public Task ExecuteAsync(EventBody<DeadQueueEvent> message)
    {
        Console.WriteLine($"事件 id:{message.Id} 已到期");
        return Task.CompletedTask;
    }

    // 每次失败时被执行
    public Task FaildAsync(Exception ex, int retryCount, EventBody<DeadQueueEvent>? message) => Task.CompletedTask;

    // 最后一次失败时执行
    public Task<bool> FallbackAsync(EventBody<DeadQueueEvent>? message) => Task.FromResult(false);
}

例如,用户下单之后,如果 15 分钟之内没有付款,那么消息到期时,自动取消订单。

可观测性

功能还在继续完善中。请参考 ActivitySourceApi 示例。

为了快速部署可观测性平台,可以使用 OpenTelemetry 官方提供的示例包快速部署相关的服务。

下载示例仓库源码:

代码语言:javascript
复制
git clone https://github.com/open-telemetry/opentelemetry-demo.git

由于示例中会包含大量的 demo 微服务,因此我们需要打开 docker-compose.yml 文件,将 services 节点的 Core Demo ServicesDependent Services 服务直接删除,只保留可观测性组件。或者直接点击下载笔者已经修改好的版本: docker-compose.yml

执行命令部署可观测性服务:

代码语言:javascript
复制
docker-compose up -d

opentelemetry-collector-contrib 用于收集链路追踪的可观测性信息,有 grpc 和 http 两种,监听端口如下:

Port

Protocol

Endpoint

Function

4317

gRPC

n/a

Accepts traces in OpenTelemetry OTLP format  (Protobuf).

4318

HTTP

/v1/traces

Accepts traces in OpenTelemetry OTLP format  (Protobuf and JSON).

经过容器端口映射后,对外端口可能不是 4317、4318 了。

引入 Maomi.MQ.Instrumentation 包,以及其它相关 OpenTelemetry 包。

代码语言:javascript
复制
<PackageReference Include="Maomi.MQ.Instrumentation " Version="1.1.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.8.1" />

然后注入服务:

代码语言:javascript
复制
const string serviceName = "myapp";

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
	options.WorkId = 1;
	options.AutoQueueDeclare = true;
	options.AppName = serviceName;
	options.Rabbit = (ConnectionFactory options) =>
	{
		options.HostName = "192.168.3.248";
		options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
	};
}, [typeof(Program).Assembly]);

builder.Services.AddOpenTelemetry()
	  .ConfigureResource(resource => resource.AddService(serviceName))
	  .WithTracing(tracing =>
	  {
		  tracing.AddMaomiMQInstrumentation(options =>
		  {
              options.Sources.AddRange(MaomiMQDiagnostic.Sources);
			  options.RecordException = true;
		  })
		  .AddAspNetCoreInstrumentation()
		  .AddOtlpExporter(options =>
		  {
			  options.Endpoint = new Uri("http://127.0.0.1:32772/v1/traces");
			  options.Protocol = OtlpExportProtocol.HttpProtobuf;
		  });
	  });

启动服务后,进行发布、消费,链路追踪信息会被自动推送到 OpenTelemetry Collector 中,通过 Jaeger 、Skywalking 等组件可以读取出来。

由于 publish、consumer 属于兄弟 trace 而不是同一个 trace,因此需要通过 Tags 查询相关联的 trace,格式 event.id=xxx

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-06-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文档说明
  • 导读
  • 快速开始
  • 消息发布者
    • IMessagePublisher
      • 连接池
        • 消息过期
          • 事务
            • 发送方确认模式
              • 独占模式
              • 消费者
                • 消费者模式
                  • 事件模式
                    • 分组
                    • 消费者模式
                      • 消费、重试和补偿
                        • 消费失败
                          • 自动创建队列
                            • Qos
                              • 延迟队列
                                • 空消费者
                                  • 分组
                                  • 事件总线模式
                                    • 中间件
                                      • 分组消费
                                      • 配置
                                        • 消费者配置
                                          • 环境隔离
                                            • 雪花 id 配置
                                            • Qos 并发和顺序
                                              • Qos 场景
                                                • 并发和异常处理
                                                • 重试
                                                  • 重试时间
                                                    • 重试机制
                                                      • 持久化剩余重试次数
                                                      • 死信队列
                                                        • 死信队列
                                                          • 延迟队列
                                                          • 可观测性
                                                          相关产品与服务
                                                          消息队列
                                                          腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
                                                          领券
                                                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档