Appearance
MQ 使用说明
MQ 是跨模块或跨系统的 event,主要目标是:
- 业务解耦
- 同步转异步
- 流量削锋等
comet-mq 目前支持 RocketMQ,后续将逐步支持其他消息中间件。
POM 依赖
- 生产者 pom 依赖
xml
<dependency>
<groupId>com.dcits</groupId>
<artifactId>comet-starter-mq-provider</artifactId>
</dependency>
启动类上添加注解 @EnableMqProvider
- 消费者 pom 依赖
xml
<dependency>
<groupId>com.dcits</groupId>
<artifactId>comet-starter-mq-consumer</artifactId>
</dependency>
启动类上添加注解 @EnableMqConsumer
如果工程中已经依赖了 comet-autoconfigure-common 模块,则不需要再依赖 comet-mq-producer。
生产者
生产者配置
在配置文件中添加如下信息:
yml
rocketmq:
#该应用是否启用消息生产者,如果为true,则须配置producer和scheduled下的属性;如果为false,则可以不用配置
isEnable: true
###producer
producer:
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
#发送消息超时时间,默认3000
sendMsgTimeout: 3000
#发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
scheduled:
# 消息更新为异常状态的时间间隔,单位为秒(s)
exceptionTimeout: 300
# 定时扫描mq_producer_msg表中状态为1的消息,如果startTime与当前时间超过五分钟则将状态更新为4。
corn1: 0/30 * * * * ?
# 定时扫描mq_producer_msg表中状态为2的消息,然后发送。
corn2: 0/20 * * * * ?
生产者发送消息
消息发送只需要在相关类中注入 MsgProducerImpl
对象,并调用 sendMessage()
方法即可。具体示例如下:
java
@Autowired
MsgProducerImple mqService;
public void mqTest() {
//发送方式1
mqService.sendMessage("dcits", "test", "发送一条消息");
//发送方式2
RocketMessage rocketMessage = new RocketMessage();
rocketMessage.setTopic("dcits");
rocketMessage.setTag("test");
rocketMessage.setMsgText("发送一条mq消息");
mqService.sendMessage(rocketMessage);
}
如果消息内容是一个对象,则需要转成 jsonStr。
消费者
消费者配置
在配置文件中添加如下信息:
yaml
###mq消费者配置
rocketmq:
consumer:
#消息组 需要和生产者的groupName配置一样
groupName: ensemble_mq_group
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
topics: DemoTopic~*;
consumeThreadMin: 20
consumeThreadMax: 64
#设置一次消费消息的条数,默认为1条
consumeMessageBatchMaxSize: 1
消费者监听和消费消息
消费者类需要继承 AbstractMQConsumer
类,并重写 onMessage
方法,从该方法的入参 Message 对象中获取消息数据,并完成业务处理逻辑。除此之外需要在该类添加 @RocketMQConsumer
注解,该注解有两个参数,分别如下:
topic
,监听的消息主题;tag
,监听的消息 tag;
具体示例代码如下:
java
@Component
@RocketMQConsumer(topic = "dcits", tag = "test")
@Slf4j
public class MyConsumer extends AbstractMQConsumer {
@Override
public void onMessage(Message message) {
log.info("消息内容为 : " + message.getMsgText());
}
}
MQ 生产者最终一致性解决方案
为了保证消息的高可靠性和最终一致性,comet 将 mq 消息与 flow 流绑定。在业务操作中,调用 mq 消息接口发送消息,首先将消息存储在数据库中,flow 整个事务提交后,再将消息发送至mq service,如果 flow 事务回滚,则修改消息状态为 4,该条消息不会被发送,通过事务的方式来保证消息的一致性。消息发送过程中可能会失败,comet 采用 rocketMq 重试机制和定时扫描两种方式保证消息可靠性。