且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

五分钟带你玩转rocketMQ(九)push与pull模式如何选择是个难题

更新时间:2021-09-25 10:51:39


rocketmq分为push与pull

MQ中Pull和Push的两种消费方式

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费。严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )

(1)Pull方式

由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

(2)Push方式

由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

代码实现pull模式

调用方法

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Autowired
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. Date day = new Date();
15. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
16.         System.out.println(df.format(day));
17. //定义tags
18.         String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
19. for (int i = 0; i < 100; i++) {
20. final int index = i;
21. String msg = "demo msg test" + ",这是第" + i + "条" + df.format(day);
22.             logger.info("开始发送消息:" + i);
23. //指定每条消息发送到某个tags
24. Message sendMsg = new Message("NewMessage", tags[i % tags.length], "KEY" + i + "时间为" + df.format(day),
25.                     ("现在排号到:" + i).getBytes());
26. //默认3秒超时
27. SendResult sendResult = defaultMQProducer.send(sendMsg);
28.             logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
29.         }
30.     }

生产者

1. @SpringBootConfiguration
2. public class MQProducerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
5. /**
6.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7.      */
8. @Value("${rocketmq.producer.groupName}")
9. private String groupName;
10. @Value("${rocketmq.producer.namesrvAddr}")
11. private String namesrvAddr;
12. /**
13.      * 消息最大大小,默认4M
14.      */
15. @Value("${rocketmq.producer.maxMessageSize}")
16. private Integer maxMessageSize ;
17. /**
18.      * 消息发送超时时间,默认3秒
19.      */
20. @Value("${rocketmq.producer.sendMsgTimeout}")
21. private Integer sendMsgTimeout;
22. /**
23.      * 消息发送失败重试次数,默认2次
24.      */
25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
26. private Integer retryTimesWhenSendFailed;
27. 
28. @Bean
29. public DefaultMQProducer getRocketMQProducer() throws Exception {
30. if (StringUtils.isEmpty(this.groupName)) {
31. throw new Exception("groupName is blank");
32.         }
33. if (StringUtils.isEmpty(this.namesrvAddr)) {
34. throw new Exception("nameServerAddr is blank");
35.         }
36.         DefaultMQProducer producer;
37.         producer = new DefaultMQProducer(this.groupName);
38.         producer.setNamesrvAddr(this.namesrvAddr);
39. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
40. //producer.setInstanceName(instanceName);
41.         producer.setVipChannelEnabled(false);
42. if(this.maxMessageSize!=null){
43.             producer.setMaxMessageSize(this.maxMessageSize);
44.         }
45. if(this.sendMsgTimeout!=null){
46.             producer.setSendMsgTimeout(this.sendMsgTimeout);
47.         }
48. //如果发送消息失败,设置重试次数,默认为2次
49. if(this.retryTimesWhenSendFailed!=null){
50.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
51.         }
52. 
53. try {
54.             producer.start();
55.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
56.                     , this.groupName, this.namesrvAddr));
57.         } catch (MQClientException e) {
58.             LOGGER.error(String.format("producer is error {}"
59.                     , e.getMessage(),e));
60. throw new Exception(e);
61.         }
62. return producer;
63.     }
64. }

消费者

以负载均衡的方式拉取消息 不能指定具体队列 --MQPullConsumerScheduleService 可以指定轮询时间

1. @SpringBootConfiguration
2. public class MQConsumerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
5. @Value("${rocketmq.consumer.namesrvAddr}")
6. private String namesrvAddr;
7. @Value("${rocketmq.consumer.groupName}")
8. private String groupName;
9. @Value("${rocketmq.consumer.consumeThreadMin}")
10. private int consumeThreadMin;
11. @Value("${rocketmq.consumer.consumeThreadMax}")
12. private int consumeThreadMax;
13. @Value("${rocketmq.consumer.topics}")
14. private String topics;
15. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
16. private int consumeMessageBatchMaxSize;
17. @Autowired
18. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
19. 
20. @Bean
21. public MQPullConsumerScheduleService testRocketMQConsumer() throws Exception {
22. // 1. 实例化对象
23. final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("rocketmq");
24. 
25. // 2. 设置NameServer
26.         scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");
27. // 3. 设置消费组为集群模式
28.         scheduleService.setMessageModel(MessageModel.CLUSTERING);
29. 
30. // 4. 注册拉取回调函数
31.         scheduleService.registerPullTaskCallback("NewMessage", new PullTaskCallback() {
32. @Override
33. public void doPullTask(MessageQueue mq, PullTaskContext context) {
34. // 5.从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。
35. MQPullConsumer consumer = context.getPullConsumer();
36. try {
37. // 6.获取该消费组的该队列的消费进度
38. long offset = consumer.fetchConsumeOffset(mq, false);
39. if (offset < 0) {
40.                         offset = 0;
41.                     }
42. // 7.拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍
43. PullResult pullResult = consumer.pull(mq, "*", offset, 2);
44.                     System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
45. switch (pullResult.getPullStatus()) {
46. case FOUND:
47. //打印消息
48.                             List<MessageExt> messageExtList = pullResult
49.                                     .getMsgFoundList();
50. for (MessageExt m : messageExtList) {
51.                                 System.out.println(m.toString());
52.                             }
53. break;
54. case NO_MATCHED_MSG:
55. break;
56. case NO_NEW_MSG:
57. case OFFSET_ILLEGAL:
58. break;
59. default:
60. break;
61.                     }
62. // 8.更新消费组该队列消费进度
63.                     consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
64. // 9.设置下次拉取消息时间间隔,单位毫秒
65.                     context.setPullNextDelayTimeMillis(10000);
66.                 } catch (Exception e) {
67.                     e.printStackTrace();
68.                 }
69.             }
70.         });
71. 
72.         scheduleService.start();
73. return scheduleService;
74.     }
75. }

其中返回的结果为

PullStatus.FOUND:成功拉取消息

PullStatus.NO_NEW_MSG:没有新的消息可被拉取

PullStatus.NO_MATCHED_MSG:过滤结果不匹配

PullStatus.OFFSET_ILLEGAL:offset非法

同时在我们的控制有pull的体现

五分钟带你玩转rocketMQ(九)push与pull模式如何选择是个难题

参考:https://www.cnblogs.com/zhyg/p/10451518.html

          https://blog.csdn.net/zhaohongfei_358/article/details/101457563