一个简单的Java消息队列,具有高效、可靠、分布式等特点 (A simple implemation of message queue in Java)
出于对消息队列的兴趣,以及为了更好的学习rocketmq和kafka,我选择对mq的核心功能进行了实现。基于这些开源mq的架构和设计思想,我尝试用自己的方式进行了实现,该项目也主要是出于个人学习目的
目前已实现如下功能:
基于发布-订阅模型实现消息队列,通过生产者组和消费者组统一管理
底层基于Netty构建高效通信框架,使用多种自定义协议完成通讯
支持普通消息的发送和接受,同时基于shardingkey和分布式锁和本地锁实现顺序消息,消费者支持pull与push两种消费模式,并提供多种生产者队列选择的负载均衡策略
实现多种通信调用方式,并提供失败重试和超时控制机制
所有消息全部持久化在磁盘,保证可靠性,同时基于mmap和pagecahe机制实现高效存储和读写分离,建立消息索引文件加速读取,不依赖其它数据库
通过不同模式的消费者位移持久化管理和确认机制,保证消息至少被消费一次,并且可以消费历史消息
通过心跳机制定期摘除不活跃连接
实现消费者组和服务端变化时的rebalance机制,实现消息重分配,内置多种分配策略
内部实现时间轮来进行高效的延时任务调度,支持生产者生产任意时间延时消息,并持久化延迟任务,引入提交机制进行崩溃恢复
借助延时队列实现消息失败定时重试,并将多次失败消息加入死信队列,支持死信队列的查看
使用线程池提升效率,通过事件监听和请求队列,生产者和消费者模式进行解耦,并通过各种机制保证并发安全
支持灰度消息分流,保证灰度发布前与发布后订阅关系与消费进度一致性
实现注册中心,提供队列发现能力,方便进行集群扩展,支持更换默认注册中心为zookeeper和nacos
支持水平扩展,目前支持多主集群
与Spring和SpringBoot框架进行集成,使用注解和starter简化配置,方便调用
- broker : MQ服务端实现,负责消息存储、通信、客户端管理等
- client : MQ客户端实现,包括消费者和生产者
- common : 通用属性
- example : 使用示例
- example-frameless : 无框架使用
- example-spring : 结合spring使用
- extension : 扩展
- cranemq-spring-boot-starter : springboot-starter
- nacos-extension : nacos注册中心集成
- zookeeper-extension : zookeeper注册中心集成
- registry : 轻量注册中心
- test
broker端采用SpringBoot编写,下载源码后,使用maven对项目进行编译,产生jar包运行
无其它依赖,需要指定配置文件位置,在broker/resources
下有示例配置文件,配置优先级:
application.yaml
中配置路径默认情况下broker需要使用两个端口:
6086
7654
http://localhost:7654/
查看队列信息自带配置中心可以直接启动,默认端口11111
,可以使用zookeeper
或nacos
无框架使用引入
<dependency>
<groupId>com.github.xjtuwsn</groupId>
<artifactId>client</artifactId>
<version>0.0.1</version>
</dependency>
SpringBoot
引入
<dependency>
<groupId>com.github.xjtuwsn</groupId>
<artifactId>cranemq-spring-boot-starter</artifactId>
<version>0.0.1</version>
</dependency>
具体使用示例参考example
模块
因为个人时间和水平(比较关键)问题,所以项目还有着不少的问题,和许多可以完善的点,包括但不仅限于以下几点:
功能方面:
[email protected]