总结几点使用MQ的姿势。
0. Common
0.1 trace
为了便于在不同系统之间追查消息的流动情况,建议在消息体中添加trace信息。
具体使用方式类似于RPC调用中添加的traceid和spanid信息。
0.2 switch
有的时候,需要动态开启/关闭对消息的处理,
1. Producer
1.1 确保发送成功
实现事务消息
- 主动
- 生产者将事务发送确认和业务逻辑实现在一起,例如通过数据库事务一起保存
- 对于可以异步发送消息的业务,可以通过定时任务找出那些没有发送成功的消息,重新发送
- 被动
- 在mq注册有消息要发
- 执行业务逻辑,发送消息
- mq找出注册了但是没有发送消息的,回调业务应用
1.2 唯一性
需要在消息体中添加唯一性字段,以便消费者过滤重复消息。
2. Consumer
2.1 并发唯一性
- 通过redis做锁保证只有一个服务在消费消息,但这里不能百分百保证唯一性
- 在通过业务数据唯一性(例如数据表唯一索引),保证只有一个服务完成数据存储
2.2 错误处理
- 对于小规模的消息消费,可以通过mq自身重试完成错误处理
- 对于大规模的消息消费,可以单独建立一个topic,将发生错误的消息发到这个topic,然后再慢慢处理错误的消息
- 这里可以添加错误消息重试次数,超过一定阈值后,抛弃该条消息
2.3 过滤
有的时候,需要对某些类型的消息做过滤,这是可以做一个全局开关,解析消息后,遇到此类消息则跳过处理。