Skip to content

MQ 使用说明

MQ 是跨模块或跨系统的 event,主要目标是:

  • 业务解耦
  • 同步转异步
  • 流量削锋等

comet-mq 目前支持 RocketMQ,后续将逐步支持其他消息中间件。

POM 依赖

  • 生产者 pom 依赖
xml
<dependency>
    <groupId>com.dcits</groupId>
    <artifactId>comet-starter-mq-provider</artifactId>
</dependency>

启动类上添加注解 @EnableMqProvider

  • 消费者 pom 依赖
xml
<dependency>
    <groupId>com.dcits</groupId>
    <artifactId>comet-starter-mq-consumer</artifactId>
</dependency>

启动类上添加注解 @EnableMqConsumer

如果工程中已经依赖了 comet-autoconfigure-common 模块,则不需要再依赖 comet-mq-producer。

生产者

生产者配置

在配置文件中添加如下信息:

yml
rocketmq:
  #该应用是否启用消息生产者,如果为true,则须配置producer和scheduled下的属性;如果为false,则可以不用配置
  isEnable: true
  ###producer
  producer:
    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #消息最大长度 默认1024*4(4M)
    maxMessageSize: 4096
    #发送消息超时时间,默认3000
    sendMsgTimeout: 3000
    #发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
  scheduled:
    # 消息更新为异常状态的时间间隔,单位为秒(s)
    exceptionTimeout: 300
    # 定时扫描mq_producer_msg表中状态为1的消息,如果startTime与当前时间超过五分钟则将状态更新为4。
    corn1: 0/30 * * * * ?
    # 定时扫描mq_producer_msg表中状态为2的消息,然后发送。
    corn2: 0/20 * * * * ?

生产者发送消息

消息发送只需要在相关类中注入 MsgProducerImpl 对象,并调用 sendMessage() 方法即可。具体示例如下:

java
@Autowired
MsgProducerImple mqService;

public void mqTest() {
    //发送方式1
    mqService.sendMessage("dcits", "test", "发送一条消息");
    //发送方式2
    RocketMessage rocketMessage = new RocketMessage();
    rocketMessage.setTopic("dcits");
    rocketMessage.setTag("test");
    rocketMessage.setMsgText("发送一条mq消息");
    mqService.sendMessage(rocketMessage);
}

如果消息内容是一个对象,则需要转成 jsonStr。

消费者

消费者配置

在配置文件中添加如下信息:

yaml
###mq消费者配置
rocketmq:
  consumer:
    #消息组 需要和生产者的groupName配置一样
    groupName: ensemble_mq_group
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
    topics: DemoTopic~*;
    consumeThreadMin: 20
    consumeThreadMax: 64
    #设置一次消费消息的条数,默认为1条
    consumeMessageBatchMaxSize: 1

消费者监听和消费消息

消费者类需要继承 AbstractMQConsumer 类,并重写 onMessage 方法,从该方法的入参 Message 对象中获取消息数据,并完成业务处理逻辑。除此之外需要在该类添加 @RocketMQConsumer 注解,该注解有两个参数,分别如下:

  • topic,监听的消息主题;

  • tag,监听的消息 tag;

具体示例代码如下:

java
@Component
@RocketMQConsumer(topic = "dcits", tag = "test")
@Slf4j
public class MyConsumer extends AbstractMQConsumer {
    @Override
    public void onMessage(Message message) {
        log.info("消息内容为 : " + message.getMsgText());
    }
}

MQ 生产者最终一致性解决方案

为了保证消息的高可靠性和最终一致性,comet 将 mq 消息与 flow 流绑定。在业务操作中,调用 mq 消息接口发送消息,首先将消息存储在数据库中,flow 整个事务提交后,再将消息发送至mq service,如果 flow 事务回滚,则修改消息状态为 4,该条消息不会被发送,通过事务的方式来保证消息的一致性。消息发送过程中可能会失败,comet 采用 rocketMq 重试机制和定时扫描两种方式保证消息可靠性。