且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

RocketMQ-消息过滤

更新时间:2022-09-02 14:25:48

目录
  • 业务背景
  • 什么是消息过滤
  • 如何使用消息过滤
  • 浅析消息过滤
  • 后续


业务背景


在电商系统中,运营和产品需要通过大数据系统来查询订单数据和商品数据。

很多公司,一开始,技术架构不是特别完善。于是乎,系统架构图可能是这样子的:


RocketMQ-消息过滤


大数据系统,直接查询mysql数据库。当大数据系统发送一些大SQL查询语句去查询数据时,容易给mysql增加cpu和io压力。


这样子容易给电商系统的查询带来影响。


为了不对mysql数据库带来直接的影响,同时又允许大数据系统查询数据。一般架构比较完善的公司的技术架构如下:


RocketMQ-消息过滤

RocketMQ-消息过滤

一般都是需要通过数据交换平台,把mysql的数据同步到hive数据库中。


同时,上面的架构也用了RocketMQ。


其实,上面去掉RocketMQ后,也能正常把数据从mysql同步到hive数据库。


那问题来了:为什么要用RocketMQ呢?


我们用反证法来思考一下:如果没RocketMQ,那会如何?首先,我们可以肯定的是:数据肯定能从mysql数据库同步到hive数据库。


但是缺点是什么?


如果不用RocketMQ的情况下,这时,又来一个别的数据团队,也需要电商数据库的数据。这时又需要进行定制开发了。


如果引入RocketMQ后,别的数据团队需要数据,你只需要给他一个Topic即可。


其实引入RocketMQ,主要就是为了方便后续扩展,共享数据。


但上述其实还有一个问题:大数据系统,可能只需要订单数据库的数据。其他表的数据,比如商品数据,它是不需要的。


这个时候,就需要消息的过滤。


消息过滤


  • 在RocketMQ中,一个消息都有且仅有一个标签,生产者发送消息时,消息都有一个标签。在消费者消息信息时,可以消费一个或多个标签的消息。


如何使用?


接下来,需要对RocketMQ如何使用?


public class FilterProducer {
    public static void main(String[] args) throws Exception{

        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group");

        //设置nameserver
        producer.setNamesrvAddr("localhost:9876");

        //启动生产者
        producer.start();

        for(int i=0;i<10;i++){
            //构建消息
            Message message = new Message("filterTopic",("helloWorld"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            message.putUserProperty("a",String.valueOf(i));

            // 发送消息
            SendResult sendResult = producer.send(message);

            // 打印发送结果
            System.out.println("发送结果:"+sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}


  • 这里给每个消息都增加了key-value。即a属性。


接下来看消费者:


public class FilterConsumer {
    public static void main(String[] args) throws Exception {
        // 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("filter_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");


        MessageSelector ms = MessageSelector.bySql("a > 5");
        // 订阅主题
        consumer.subscribe("filterTopic",ms);

        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    System.out.println("消费消息:"+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("启动消费者");
    }
}
//运行结果如下:
启动消费者
消费消息:helloWorld8
消费消息:helloWorld7
消费消息:helloWorld9
消费消息:helloWorld6


消息过滤的语法

RocketMQ-消息过滤

RocketMQ-消息过滤


注意点


  • 在启动消费者时,如果你报错误:


CODE: 1  DESC: The broker does not support consumer to filter message by SQL92


那是因为你启动broker的时候,没开启消息过滤。


如何开启呢?


在broker.conf文件中加入:enablePropertyFilter = true


然后启动的时候,还要指定配置文件broker.conf


nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &


浅析消息过滤


RocketMQ-消息过滤


首先,我们根据代码点进去看看RocketMQ-消息过滤三个重载的方法subscribe:


RocketMQ-消息过滤


我们看第一种:


subscribe(String topic, String subExpression) 就是直接加标签的这种:


consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");


我们看第二种:


subscribe(String topic, String fullClassName, String filterClassSource)


这种,是自定义一个过滤器实现类的。


我大概写一下:实现MessageFilter接口的match方法


RocketMQ-消息过滤


fullClassName:是指类的全路径 filterClassSource:是指类的java文件路径


  • 1,Broker机器启动多个FilterServer过滤进程
  • 2,Consumer启动后,会想Broker传递一个Java类
  • 3,Consumer从FilterServer拉取消息,FilterServer从Broker拉取消息,按照上传的java类进行过滤,过滤后返回给Consumer


我们看第三种:


subscribe(String topic, MessageSelector messageSelector)


就是我们刚刚写的这种


这一节,就写到这了。


有问题,欢迎留言沟通


6、后续文章


  • RocketMQ-入门(已更新)
  • RocketMQ-架构和角色(已更新)
  • RocketMQ-消息发送(已更新)
  • RocketMQ-消费信息
  • RocketMQ-消费者的广播模式和集群模式(已更新)
  • RocketMQ-顺序消息(已更新)
  • RocketMQ-延迟消息(已更新)
  • RocketMQ-批量消息
  • RocketMQ-过滤消息(已更新)
  • RocketMQ-事务消息(已更新)
  • RocketMQ-消息存储
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-刷盘机制
  • RocketMQ-幂等性
  • RocketMQ-消息重试
  • RocketMQ-死信队列

...


欢迎各位入(guan)股(zhu),后续文章干货多多。


—本文转载自「大头菜技术」公众号