IKunLibrary

RabbitMQ、Kafka客户端库

Stars
3
Committers
1

IKunLibrary

[TOC]

1.引言

RabbitMQ 是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。

  • 可靠性
  • 灵活的路由
  • 集群
  • 联合
  • 高可用的队列
  • 多协议
  • 多语言客户端
  • 管理界面
  • 插件
  • 社区
  • 可扩展性

2.基本概念

  • 生产者(Producer)

  • 消费者(Consumer)

  • 队列(Queue)

  • 交换机(Exchange)

  • 绑定(Binding)

  • 路由键(Routing Key)

  • 通配符(Wildcard)

  • 绑定键(Binding Key)

  • 持久化(Durable)

  • 确认机制(Acknowledge)

    • 自动确认
    • 手动确认
  • 拒绝机制(Reject)

  • 死信队列(Dead Letter Queue)

  • 消息过期(Message TTL)

  • 消息优先级(Message Priority)

  • 消息分发

3.环境搭建

  • Docker 安装 RabbitMQ

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e TZ=Asia/Shanghai rabbitmq:management
    
    • -d:后台运行
    • --restart:重启策略
    • --name:容器名称
    • -p:端口映射
    • --hostname:主机名
    • -e:环境变量
      • RABBITMQ_DEFAULT_USER:默认用户名
      • RABBITMQ_DEFAULT_PASS:默认密码
      • TZ:时区
    • rabbitmq:management:镜像名称
  • Docker Compose 安装 RabbitMQ

    version: "3.1"
    services:
        rabbitmq:
            restart: always
            image: rabbitmq:management
            container_name: rabbitmq
            hostname: my-rabbit
            ports:
                - 5672:5672
                - 15672:15672 # RabbitMQ管理界面端口
            environment:
                TZ: Asia/Shanghai
                RABBITMQ_DEFAULT_USER: admin
                RABBITMQ_DEFAULT_PASS: admin
    
    • restart:重启策略
    • image:镜像名称
    • container_name:容器名称
    • hostname:主机名
    • ports:端口映射
    • environment:环境变量
      • TZ:时区
      • RABBITMQ_DEFAULT_USER:默认用户名
      • RABBITMQ_DEFAULT_PASS:默认密码
    • rabbitmq:management:镜像名称

4.使用

  • 新建 TestRequest 类,实现 IRabbitMqRequest 接口,定义消息体
public class TestRequest : IRabbitMqRequest
{
    /// <summary>
    /// 重试次数
    /// </summary>
    public int RetryCount { get; set; }

    #region 自定义字段

    /// <summary>
    /// id
    /// </summary>
    public string Id { get; set; } = default!;

    /// <summary>
    /// 名称
    /// </summary>
    public string Name { get; set; } = default!;

    /// <summary>
    /// 年龄
    /// </summary>
    public int Age { get; set; }

    #endregion
}
  • 新建TestRequestHandler类,实现IRabbitMqRequestHandler<TestRequest>接口,处理消息
public class TestRequestHanlder : IRequestProcessorHandler<TestRequest>
{
    private readonly ILogger<TestRequestHanlder> _logger;

    public TestRequestHanlder(ILogger<TestRequestHanlder> logger)
    {
        _logger = logger;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }

    public Task StopAsync(int milliseconds, CancellationToken cancellationToken = default)
    {
        return Task.CompletedTask;
    }

    public async Task HandleAsync(TestRequest request, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation($"开始处理消息: {request.Id}");

        //模拟处理消息耗时操作
        await Task.Delay(1000, cancellationToken);

        _logger.LogInformation($"消息处理完成: {request.Id}");
    }
}
  • 使用 IHostedService 来托管服务
public class SampleHostedService : IHostedService
{
    private readonly IConsumerProcessorManager<TestRequest> _consumerProcessorManager;
    private readonly IHostApplicationLifetime _applicationLifetime;
    private readonly ILogger<SampleHostedService> _logger;

    public SampleHostedService(
        IConsumerProcessorManager<TestRequest> consumerProcessorManager,
        IHostApplicationLifetime applicationLifetime,
        ILogger<SampleHostedService> logger)
    {
        _consumerProcessorManager = consumerProcessorManager;
        _applicationLifetime = applicationLifetime;
        _logger = logger;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _applicationLifetime.ApplicationStarted.Register(() =>
        {
            _logger.LogInformation("SampleHostedService is starting.");
            _consumerProcessorManager.StartAsync(cancellationToken);
        });

        _applicationLifetime.ApplicationStopping.Register(() =>
        {
            _logger.LogInformation("SampleHostedService is stopping.");
            _consumerProcessorManager.StopAsync(3000, cancellationToken);
        });

        await Task.CompletedTask;
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await Task.CompletedTask;
    }
}
  • 注册并启用服务
IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddHostedService<SampleHostedService>();

        var configuration = services.BuildServiceProvider().GetRequiredService<IConfiguration>();

        var hostName = configuration["RabbitMq:Host"] ?? throw new Exception("HostName is not configured");
        var port = int.Parse(configuration["RabbitMq:Port"] ?? throw new Exception("Port is not configured"));
        var userName = configuration["RabbitMq:Username"] ?? throw new Exception("Username is not configured");
        var password = configuration["RabbitMq:Password"] ?? throw new Exception("Password is not configured");
        var queueName = configuration["RabbitMq:QueueName"] ?? throw new Exception("QueueName is not configured");

        services.AddRabbitMq<TestRequest, TestRequestHanlder>(options =>
        {
            options.UseSsl = false;
            options.HostName = hostName;
            options.Port = port;
            options.UserName = userName;
            options.Password = password;
            options.Durable = true;
            options.NetworkRecoveryInterval = 10000;
            options.ExchangeType = ExchangeType.Direct;
            options.QueueName = queueName;
            options.Exchange = $"{queueName}_SERVICE_EXCHANGE";
            options.RoutingKey = $"{queueName}_ROUTING_KEY";
            options.DeadLetterExchange = $"{queueName}_SERVICE_EXCHANGE_DEAD";
            options.DeadLetterQueueName = $"{queueName}_DEAD";
            options.DeadLetterRoutingKey = $"{queueName}_ROUTING_KEY";
        });
    })
    .Build();

await host.RunAsync();