[TOC]
RabbitMQ 是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。
生产者(Producer)
消费者(Consumer)
队列(Queue)
交换机(Exchange)
绑定(Binding)
路由键(Routing Key)
通配符(Wildcard)
绑定键(Binding Key)
持久化(Durable)
确认机制(Acknowledge)
拒绝机制(Reject)
死信队列(Dead Letter Queue)
消息过期(Message TTL)
消息优先级(Message Priority)
消息分发
Docker 安装 RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e TZ=Asia/Shanghai rabbitmq:management
-d
:后台运行--restart
:重启策略--name
:容器名称-p
:端口映射--hostname
:主机名-e
:环境变量
RABBITMQ_DEFAULT_USER
:默认用户名RABBITMQ_DEFAULT_PASS
:默认密码TZ
:时区rabbitmq:management
:镜像名称Docker Compose 安装 RabbitMQ
version: "3.1"
services:
rabbitmq:
restart: always
image: rabbitmq:management
container_name: rabbitmq
hostname: my-rabbit
ports:
- 5672:5672
- 15672:15672 # RabbitMQ管理界面端口
environment:
TZ: Asia/Shanghai
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
restart
:重启策略image
:镜像名称container_name
:容器名称hostname
:主机名ports
:端口映射environment
:环境变量
TZ
:时区RABBITMQ_DEFAULT_USER
:默认用户名RABBITMQ_DEFAULT_PASS
:默认密码rabbitmq:management
:镜像名称TestRequest
类,实现 IRabbitMqRequest
接口,定义消息体public class TestRequest : IRabbitMqRequest
{
/// <summary>
/// 重试次数
/// </summary>
public int RetryCount { get; set; }
#region 自定义字段
/// <summary>
/// id
/// </summary>
public string Id { get; set; } = default!;
/// <summary>
/// 名称
/// </summary>
public string Name { get; set; } = default!;
/// <summary>
/// 年龄
/// </summary>
public int Age { get; set; }
#endregion
}
TestRequestHandler
类,实现IRabbitMqRequestHandler<TestRequest>
接口,处理消息public class TestRequestHanlder : IRequestProcessorHandler<TestRequest>
{
private readonly ILogger<TestRequestHanlder> _logger;
public TestRequestHanlder(ILogger<TestRequestHanlder> logger)
{
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(int milliseconds, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public async Task HandleAsync(TestRequest request, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"开始处理消息: {request.Id}");
//模拟处理消息耗时操作
await Task.Delay(1000, cancellationToken);
_logger.LogInformation($"消息处理完成: {request.Id}");
}
}
IHostedService
来托管服务public class SampleHostedService : IHostedService
{
private readonly IConsumerProcessorManager<TestRequest> _consumerProcessorManager;
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly ILogger<SampleHostedService> _logger;
public SampleHostedService(
IConsumerProcessorManager<TestRequest> consumerProcessorManager,
IHostApplicationLifetime applicationLifetime,
ILogger<SampleHostedService> logger)
{
_consumerProcessorManager = consumerProcessorManager;
_applicationLifetime = applicationLifetime;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_applicationLifetime.ApplicationStarted.Register(() =>
{
_logger.LogInformation("SampleHostedService is starting.");
_consumerProcessorManager.StartAsync(cancellationToken);
});
_applicationLifetime.ApplicationStopping.Register(() =>
{
_logger.LogInformation("SampleHostedService is stopping.");
_consumerProcessorManager.StopAsync(3000, cancellationToken);
});
await Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await Task.CompletedTask;
}
}
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService<SampleHostedService>();
var configuration = services.BuildServiceProvider().GetRequiredService<IConfiguration>();
var hostName = configuration["RabbitMq:Host"] ?? throw new Exception("HostName is not configured");
var port = int.Parse(configuration["RabbitMq:Port"] ?? throw new Exception("Port is not configured"));
var userName = configuration["RabbitMq:Username"] ?? throw new Exception("Username is not configured");
var password = configuration["RabbitMq:Password"] ?? throw new Exception("Password is not configured");
var queueName = configuration["RabbitMq:QueueName"] ?? throw new Exception("QueueName is not configured");
services.AddRabbitMq<TestRequest, TestRequestHanlder>(options =>
{
options.UseSsl = false;
options.HostName = hostName;
options.Port = port;
options.UserName = userName;
options.Password = password;
options.Durable = true;
options.NetworkRecoveryInterval = 10000;
options.ExchangeType = ExchangeType.Direct;
options.QueueName = queueName;
options.Exchange = $"{queueName}_SERVICE_EXCHANGE";
options.RoutingKey = $"{queueName}_ROUTING_KEY";
options.DeadLetterExchange = $"{queueName}_SERVICE_EXCHANGE_DEAD";
options.DeadLetterQueueName = $"{queueName}_DEAD";
options.DeadLetterRoutingKey = $"{queueName}_ROUTING_KEY";
});
})
.Build();
await host.RunAsync();