Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

RocketMQ--消息

创建工程:rocketmq-demo

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>
  
    <groupId>com.mncode</groupId>
    <artifactId>rocketmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <rocketmq.version>4.4.0</rocketmq.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
    </dependencies>
</project>

普通消息

生产者
  1. 基本流程:
 1. 创建生产者
 2. 设置NameServer地址
 3. 启动生产者
 4. 创建消息
 5. 发送消息
 6. 关闭生产者
  1. 代码实现:
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
         // 1. 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
         // 2. 设置nameServer地址
        producer.setNamesrvAddr("192.168.1.201:9876");
         // 3. 启动生产者
        producer.start();

        for (int i = 0; i < 5; i++) {
            try {
                // 4. 创建消息
                Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 5. 发送消息
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 6. 关闭生产者
        producer.shutdown();
    }
}
  1. 输出结果:
SendResult [sendStatus=SEND_OK, msgId=C0A801670C8C18B4AAC29990ABE60000, offsetMsgId=C0A801C900002A9F0000000000013972, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A801670C8C18B4AAC29990AC4A0001, offsetMsgId=C0A801C900002A9F0000000000013A24, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A801670C8C18B4AAC29990AC4A0002, offsetMsgId=C0A801C900002A9F0000000000013AD6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A801670C8C18B4AAC29990AC540003, offsetMsgId=C0A801C900002A9F0000000000013B88, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A801670C8C18B4AAC29990AC540004, offsetMsgId=C0A801C900002A9F0000000000013C3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=3]
消费者
  1. 基本流程:
 1. 创建消费者
 2. 设置nameServer地址
 3. 设置要订阅的topic
 4. 创建消息监听器
 5. 启动消费者
  1. 代码实现:
public class Consumer {
    public static void main(String[] args) throws Exception {
         // 1. 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
         // 2. 设置namesrv地址
        consumer.setNamesrvAddr("192.168.1.201:9876");
         // 3. 设置要订阅的topic
        consumer.subscribe("TopicTest", "*");
         // 4. 创建消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s 消费消息: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5. 启动消费者
        consumer.start();
    }
}
  1. 输出结果:
ConsumeMessageThread_1 消费消息: Hello RocketMQ 3 
ConsumeMessageThread_2 消费消息: Hello RocketMQ 0 
ConsumeMessageThread_3 消费消息: Hello RocketMQ 2 
ConsumeMessageThread_5 消费消息: Hello RocketMQ 1 
ConsumeMessageThread_4 消费消息: Hello RocketMQ 4 

顺序消息

* RocketMQ支持局部顺序消费,可以确保同一队列中的消息被顺序消费。
* 若要做到全局顺序,可以将topic配置成一个队列。
生产者
  1. 代码实现:
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("order_producer");
        producer.setNamesrvAddr("192.168.1.201:9876");
        producer.start();

        // 队列选择器
        MessageQueueSelector selector = new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                return mqs.get(id % mqs.size());
            }
        };

        // 订单流程
        String[] orderSteps = {"创建", "支付", "完成"};

        for (int i = 0; i < 3; i++) {
            String orderId = String.valueOf(System.currentTimeMillis()) + i;
            for (int j = 0; j < orderSteps.length; j++) {
                String orderMessage = "订单:" + orderId + "-" + orderSteps[j];
                Message message = new Message("order", "order", orderId, orderMessage.getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult result = producer.send(message, selector, i);
                System.out.printf("%s ## 消息队列id:%s\n", orderMessage, result.getMessageQueue().getQueueId());
            }
        }

        producer.shutdown();
    }
}
  1. 输出结果:
订单:15882463524350-创建 ## 消息队列id:0
订单:15882463524350-支付 ## 消息队列id:0
订单:15882463524350-完成 ## 消息队列id:0
订单:15882463531071-创建 ## 消息队列id:1
订单:15882463531071-支付 ## 消息队列id:1
订单:15882463531071-完成 ## 消息队列id:1
订单:15882463531252-创建 ## 消息队列id:2
订单:15882463531252-支付 ## 消息队列id:2
订单:15882463531252-完成 ## 消息队列id:2
消费者
  1. 代码实现:
public class Consumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");
        consumer.setNamesrvAddr("192.168.1.201:9876");
        consumer.subscribe("order", "*");

        // 注册监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
    }

}
  1. 输出结果:
订单:15882463531071-创建
订单:15882463524350-创建
订单:15882463531252-创建
订单:15882463531071-支付
订单:15882463524350-支付
订单:15882463531071-完成
订单:15882463531252-支付
订单:15882463524350-完成
订单:15882463531252-完成

事务消息

4.3.0版本,加入了对事务的支持。
基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚。

image.png

事务监听器
public class TransactionListenerImpl implements TransactionListener {
    /**
     * 执行本地事务
     * 若成功:提交事务
     * 若失败:回滚事务,消息会被MQ删除,不会被消费到
     * UNKNOW:未知状态,MQ需要回查来确认事务的执行状态
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderId = msg.getKeys();
        System.out.println("#####################");
        System.out.printf("订单%s开启本地事务\n", orderId);
        // 业务处理
        System.out.printf("订单%s处理中...\n", orderId);

        if ("order-001".equals(orderId)) {
            // 处理成功;提交事务
            System.out.printf("订单%s处理成功\n", orderId);
            Order.update(orderId, "success");
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if ("order-002".equals(orderId)) {
            // 处理失败;回滚事务
            System.out.printf("订单%s处理失败\n", orderId);
            Order.update(orderId, "fail");
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if ("order-003".equals(orderId)) {
            Order.update(orderId, "success");
            return LocalTransactionState.UNKNOW;
        } else if ("order-004".equals(orderId)) {
            Order.update(orderId, "fail");
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.UNKNOW;
    }

    /**
     * MQ回查来确认事务是提交还是回滚
     *
     * @param msg Check message
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("#####################");
        System.out.printf("检查订单:%s的本地事务\n", msg.getKeys());
        Order order = Order.get(msg.getKeys());
        String status = order.getStatus();
        if (status.equals("success")) {
            System.out.printf("订单:%s成功\n", msg.getKeys());
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            System.out.printf("订单:%s失败\n", msg.getKeys());
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

数据
public class Order implements Serializable {

    private String orderId;
    private String status;

    public Order() {
    }

    public Order(String orderId, String status) {
        this.orderId = orderId;
        this.status = status;
    }

    public static Map<String, Order> data = new HashMap<>();

    static {
        data.put("order-001", new Order("order-001", "init"));
        data.put("order-002", new Order("order-002", "init"));
        data.put("order-003", new Order("order-003", "init"));
        data.put("order-004", new Order("order-004", "init"));
    }

    public static void update(String orderId, String status) {
        Order order = data.get(orderId);
        order.setStatus(status);
    }

    public static Order get(String orderId) {
        return data.get(orderId);
    }
}
生产者
  1. 代码实现:
public class Producer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
        producer.setNamesrvAddr("192.168.1.201:9876");
        // 事务监听器
        TransactionListener listener = new TransactionListenerImpl();
        producer.setTransactionListener(listener);
        producer.start();

        for (Map.Entry<String, Order> data : Order.data.entrySet()) {
            String orderId = data.getKey();
            Message message = new Message("transaction", "transaction", orderId, orderId.getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendMessageInTransaction(message, null);
        }

        Thread.sleep(2000000);
        producer.shutdown();
    }
}
  1. 输出结果:
*************************
订单order-003开启本地事务
订单order-003处理中...
*************************
订单order-004开启本地事务
订单order-004处理中...
*************************
订单order-001开启本地事务
订单order-001处理中...
订单order-001处理成功
*************************
订单order-002开启本地事务
订单order-002处理中...
订单order-002处理失败
*************************
检查订单:order-004的本地事务
订单:order-004失败
*************************
检查订单:order-003的本地事务
订单:order-003成功
消费者
  1. 代码实现:
public class Consumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
        consumer.setNamesrvAddr("192.168.1.201:9876");
        consumer.subscribe("transaction", "*");

        // 注册监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("订单:%s被消费\n", msg.getKeys());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动");
    }
}
  1. 输出结果:
消费者启动
订单:order-001被消费
订单:order-003被消费