RocketMQ普通消息实战演练详解 目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启
目录
- 引言
- 普通消息同步发送
- 普通消息异步发送
- 普通消息单向发送
- 集群消费模式
- 广播消费模式
引言
之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV。
相关的配置,安装和启动在这篇文章有相关讲解 https://www.mobange.com/article/260237.htm
普通消息同步发送
同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息。这个方式可以确保消息发送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。
public static void main(String[] args) throws Exception {
//实例化消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("group_luke");
//设置NameSever地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
//同步发送方式
SendResult send = producer.send(msg);
//确认返回
System.out.println(send);
}
//关闭producer
producer.shutdown();
}
普通消息异步发送
异步消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
public static void main(String[] args) throws Exception {
//实例化消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("group_luke");
//设置NameSever地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
//SendCallback会接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
//若是过早关闭producer,会抛出The producer service state not OK, SHUTDOWN_ALREADY的错
Thread.sleep(10000);
//关闭producer
producer.shutdown();
}
普通消息单向发送
单项发送不关心发送的结果,只发送请求不等待应答。发送消息耗时极短。
public static void main(String[] args) throws Exception {
//实例化消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("group_luke");
//设置NameSever地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
//同步发送方式
producer.sendOneway(msg);
}
//关闭producer
producer.shutdown();
}
集群消费模式
消费者采用负载均衡的方式消费消息,同一个Group下的多个Consumer共同消费Queue里的Message,每个Consumer处理的消息不同。
一个Consumer Group中的各个Consumer实例分共同消费消息,即一条消息只会投递到一个Group下面的一个实例,并且只消费一遍。
例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。
public static void main(String[] args) throws Exception {
//实例化消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
//指定nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic,"*"表示所有tag
consumer.subscribe("topic_luke","*");
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
广播消费模式
广播消费模式中把消息对一个Group下的各个Consumer实例都投递一遍。也就是说消息也会被 Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
public static void main(String[] args) throws Exception {
//实例化消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
//指定nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic,"*"表示所有tag
consumer.subscribe("topic_luke","*");
consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
以上就是RocketMQ普通消息实战演练详解的详细内容,更多关于RocketMQ普通消息的资料请关注我们其它相关文章!
本文标题为:RocketMQ普通消息实战演练详解
基础教程推荐
- Java实现线程插队的示例代码 2022-09-03
- ConditionalOnProperty配置swagger不生效问题及解决 2023-01-02
- Java数据结构之对象比较详解 2023-03-07
- java实现多人聊天系统 2023-05-19
- springboot自定义starter方法及注解实例 2023-03-31
- JDK数组阻塞队列源码深入分析总结 2023-04-18
- Java文件管理操作的知识点整理 2023-05-19
- Java实现查找文件和替换文件内容 2023-04-06
- Java并发编程进阶之线程控制篇 2023-03-07
- java基础知识之FileInputStream流的使用 2023-08-11