且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

五分钟带你玩转rocketMQ(十)实战分布式事务

更新时间:2022-08-22 13:45:25


什么是事务性消息?

它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行。

使用限制

(1)事务性消息没有调度和批处理支持。

(2)为了避免单个消息被检查过多而导致半队列消息累积,我们将单个消息的检查次数默认限制为15次,但是如果一条消息被检查过,用户可以通过更改代理配置中的“transactionCheckMax”参数来更改此限制“transactionCheckMax”次,默认情况下,broker将丢弃此消息并同时打印错误日志。用户可以通过重写“AbstractTransactionCheckListener”类来更改此行为。

(3)事务消息将在代理配置中的参数“transactionTimeout”确定的一段时间后进行检查。用户也可以在发送事务性消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,此参数优先于“transactionMsgTimeout”参数。

(4)事务性消息可能被多次检查或使用。

(5)提交给用户目标主题的不可信消息可能失败。目前,这取决于日志记录。RocketMQ本身的高可用机制保证了高可用性。如果要确保事务消息不会丢失,并且保证事务完整性,建议使用同步双写。机制。

(6)事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ服务器按其生产者id查询客户机。

应用程序

1、交易状态

事务性消息有三种状态:

(1)TransactionStatus.CommitTransaction:commit transaction,表示允许消费者使用此消息。

(2)TransactionStatus.rollback transaction:回滚事务,表示消息将被删除,不允许使用。

(3)TransactionStatus.Unknown:中间状态,表示需要MQ进行回查以确定状态。

原理图

五分钟带你玩转rocketMQ(十)实战分布式事务

假设有A,B两服务,A服务为优惠券服务,B服务为结算服务。

1.A服务发给B服务一个半消息(B服务消费不到)

2.当B服务返回接收成功后 A服务开展优惠券逻辑

3.优惠券数据落到数据库 此时有3个结果

    1.A服务成功 发送B服务commit 修改B服务的消息状态 B服务可以执行结算

    2.A服务失败发送rollback 修改B服务状态为失败 删除消息 

    3.网络原因 需要进行事务回查

4.B服务执行逻辑(如果B服务失败 需要手动判断 rocketMq的官方文档提出不提出解决方案)

代码处理

(1)创建事务生产者

使用TransactionMQProducer类创建producer客户机,并指定唯一的producerGroup,然后可以设置自定义线程池来处理检查请求。在执行本地事务后,需要根据执行结果回复MQ,回复状态在上一节描述。

1. @SpringBootConfiguration
2. public class MQProducerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
5. /**
6.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7.      */
8. @Value("${rocketmq.producer.groupName}")
9. private String groupName;
10. @Value("${rocketmq.producer.namesrvAddr}")
11. private String namesrvAddr;
12. /**
13.      * 消息最大大小,默认4M
14.      */
15. @Value("${rocketmq.producer.maxMessageSize}")
16. private Integer maxMessageSize;
17. /**
18.      * 消息发送超时时间,默认3秒
19.      */
20. @Value("${rocketmq.producer.sendMsgTimeout}")
21. private Integer sendMsgTimeout;
22. /**
23.      * 消息发送失败重试次数,默认2次
24.      */
25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
26. private Integer retryTimesWhenSendFailed;
27. 
28. @Bean(name = "customRocketMQProducer")
29. public DefaultMQProducer getRocketMQProducer() throws Exception {
30. if (StringUtils.isEmpty(this.groupName)) {
31. throw new Exception("groupName is blank");
32.         }
33. if (StringUtils.isEmpty(this.namesrvAddr)) {
34. throw new Exception("nameServerAddr is blank");
35.         }
36. 
37. //事务消息需要
38.         TransactionListener transactionListener = new TransactionListenerImpl();
39.         TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
40.         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
41. @Override
42. public Thread newThread(Runnable r) {
43.                 Thread thread = new Thread(r);
44.                 thread.setName("client-transaction-msg-check-thread");
45. return thread;
46.             }
47.         });
48. //事务消息需要
49.         producer.setExecutorService(executorService);
50.         producer.setTransactionListener(transactionListener);
51. 
52.         producer.setNamesrvAddr(this.namesrvAddr);
53. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
54. //producer.setInstanceName(instanceName);
55. if (this.maxMessageSize != null) {
56.             producer.setMaxMessageSize(this.maxMessageSize);
57.         }
58. if (this.sendMsgTimeout != null) {
59.             producer.setSendMsgTimeout(this.sendMsgTimeout);
60.         }
61. //如果发送消息失败,设置重试次数,默认为2次
62. if (this.retryTimesWhenSendFailed != null) {
63.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
64.         }
65. try {
66.             producer.start();
67.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
68.                     , this.groupName, this.namesrvAddr));
69.         } catch (MQClientException e) {
70.             LOGGER.error(String.format("producer is error {}"
71.                     , e.getMessage(), e));
72. throw new Exception(e);
73.         }
74. return producer;
75.     }
76. }

(2)实现TransactionListener接口

“executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节中提到的三种事务状态之一。

“checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三种事务状态之一。

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.producer.LocalTransactionState;
4. import org.apache.rocketmq.client.producer.TransactionListener;
5. import org.apache.rocketmq.common.message.Message;
6. import org.apache.rocketmq.common.message.MessageExt;
7. 
8. import java.util.concurrent.ConcurrentHashMap;
9. import java.util.concurrent.atomic.AtomicInteger;
10. 
11. public class TransactionListenerImpl implements TransactionListener {
12. 
13. private AtomicInteger transactionIndex = new AtomicInteger(0);
14. 
15. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
16. 
17. //执行本地事务(一般都是写一张日志表,放入本次交易的TransactionId,判断本次交易是否成功)
18. @Override
19. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
20. //执行业务
21. //do thing
22. if(true){
23. //业务执行成功
24. //插入日志表
25. //返回成功
26. return LocalTransactionState.COMMIT_MESSAGE;
27.         }else{
28. return LocalTransactionState.ROLLBACK_MESSAGE;
29.         }
30.     }
31. 
32. //回查事务 当发送给broken 本地事务的状态状态丢失了 broken会再次根据id验证本地事务(查询日志表是否有日志)
33. @Override
34. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
35. String transactionId = msg.getTransactionId();
36. //根据transactionId查询日志表条数 如果条数大于0 就表示已经执行过以上逻辑了
37. Integer countStatus = Integer.valueOf(transactionId);
38. if (countStatus > 0) {
39. return LocalTransactionState.COMMIT_MESSAGE;
40.         } else {
41. return LocalTransactionState.ROLLBACK_MESSAGE;
42.         }
43.     }
44. }

(3)调用

1. package cn.baocl.rocketmq.controllor;
2. 
3. import org.apache.rocketmq.client.exception.MQBrokerException;
4. import org.apache.rocketmq.client.exception.MQClientException;
5. import org.apache.rocketmq.client.producer.DefaultMQProducer;
6. import org.apache.rocketmq.client.producer.SendResult;
7. import org.apache.rocketmq.common.message.Message;
8. import org.apache.rocketmq.remoting.common.RemotingHelper;
9. import org.apache.rocketmq.remoting.exception.RemotingException;
10. import org.slf4j.Logger;
11. import org.slf4j.LoggerFactory;
12. import org.springframework.web.bind.annotation.RequestMapping;
13. import org.springframework.web.bind.annotation.RestController;
14. 
15. import javax.annotation.Resource;
16. import java.io.UnsupportedEncodingException;
17. 
18. 
19. @RestController
20. @RequestMapping("/test")
21. public class TestControllor {
22. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
23. 
24. /**
25.      * 使用RocketMq的生产者
26.      */
27. @Resource(name = "customRocketMQProducer")
28. private DefaultMQProducer defaultMQProducer;
29. 
30. @RequestMapping("/send")
31. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
32.         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
33. 
34. for (int i = 0; i < 10; i++) {
35. try {
36. Message msg =
37. new Message("DemoTopic", tags[i % tags.length], "KEY" + i,
38.                                 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
39. //sendMessageInTransaction :1).检查transactionListener是否存在
40. //                          2).调用父类执行事务消息发送
41. SendResult sendResult = defaultMQProducer.sendMessageInTransaction(msg, null);
42.                 System.out.println("发送消息为");
43.                 System.out.printf("%s%n", sendResult);
44.                 Thread.sleep(10);
45.             } catch (MQClientException | UnsupportedEncodingException e) {
46.                 e.printStackTrace();
47.             }
48.         }
49. 
50. for (int i = 0; i < 100000; i++) {
51.             Thread.sleep(1000);
52.         }
53.     }
54. }

阿里忽略了消费者报错了的情况,可以根据业务自定义

参考:https://my.oschina.net/u/3768341/blog/1616193

        https://segmentfault.com/a/1190000019755235?utm_source=tag-newest

        https://blog.csdn.net/hosaos/article/details/90050276

        https://www.jianshu.com/p/cc5c10221aa1