我们对顺序消息,延时消息,批量消息,过滤消息进行一个demo,方便以后cv。
顺序消息的生产与消费
顺序消息是指当消息消费的时候,会按照消息发送的顺序进行消费。RocketMQ可以保证消息严格的有序,这里分为全局有序和分区有序。 全局有序是针对所有消息的,每一个消息都按照绝对的先入先出原则。 分区有序,是指将需要有序的消息控制发送到同一个Queue上,消费的时候从这个Queue上拉取。即相对每个Queue是有序的。
全局有序
控制Topic指定只有一个队列,并且保证生产者与消费者只有一个就可以了
分区有序
要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,我们以下单为例,我们需要确保订单号尾数相同的消息会被先后发送到同一个队列中,然后再消费端开启负载均衡模式,每个消费者只消费一个队列。最终确保一个消费者拿到的消息对于一个订单来说是有序的。
也就是说,需要自定义规则使得同一个订单的或者同一个业务逻辑的消息发送到同一个消息队列,再让一个消费者去消费的时候,就是顺序消费了。
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("luke_group_order");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
List orderList = buildOrder();
for (int i = 0; i < orderList.size(); i++) {
String body = orderList.get(i).toString();
Message msg = new Message("luke_topic_order", null, "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Long id = (Long)arg;
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getId());
}
producer.shutdown();
}
Order类
@Data
@NoArgsConstructor
@ToString
class Order{
Long id;
String info;
}
模拟消息
模拟一个列表,我把中间的2和3删了,要不太长了。这里是按照提交,付款,打包发货的顺序发送消息的。在消费消息的时候也需要按照这个顺序。
private static List buildOrder(){
List orderList = new ArrayList<>();
Order order = new Order();
order.setId(1L);
order.setInfo("提交");
orderList.add(order);
order = new Order();
order.setId(1L);
order.setInfo("付款");
orderList.add(order);
order = new Order();
order.setId(1L);
order.setInfo("打包发货");
orderList.add(order);
...
order = new Order();
order.setId(4L);
order.setInfo("提交");
orderList.add(order);
order = new Order();
order.setId(4L);
order.setInfo("付款");
orderList.add(order);
order = new Order();
order.setId(4L);
order.setInfo("打包发货");
orderList.add(order);
return orderList;
}
消费者
public static void main(String[] args) throws Exception {
//实例化消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("luke_group_order");
//指定nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅topic
consumer.subscribe("luke_topic_order","*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@SneakyThrows
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(Thread.currentThread().getName()+"-"+new String(msg.getBody()));
}
try {
//模拟业务逻辑处理
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
//报错之后,等一会再处理这批消息,而不是放入重试队列。
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
// 标记该消息已经被成功消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
结果
小结
可以看到,每个线程处理了一个订单的三个消息。
使用顺序消息:首先要保证消息是有序进入 MQ 的,消息放入 MQ 之前,对 id 等关键字进行取模,放入指定 messageQueue
特别注意的是,当consume 消费消息失败时,不能返回 reconsume_later
,这样会导致乱序,所以应该返回 SUSPEND_CURRENT_QUEUE_A_MOMENT
,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
延时消息的生产与消费
延时消息:我们有时候需要将将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),因为特殊的业务场景需要延迟一定时间后才投递到 Consumer 进行消费,这种消息就是延时消息。
延时消息比较简单就是普通消息设置了延时等级DelayTimeLevel。
生产者
public static void main(String[] args) throws Exception {
//实例化消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("group_luke");
//设置NameSever地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
//设置延时等级
msg.setDelayTimeLevel(4);
//同步发送方式
SendResult send = producer.send(msg);
//确认返回
System.out.println(send);
}
//关闭producer
producer.shutdown();
}
批量消息的生产与消费
在高并发场景中,为了减少网络和IO的开销,也会使用批量发送消息,这能显著提高传递消息发送时的性能。使用批量消息时需要确保这些消息有相同的 topic,相同的 waitStoreMsgOK(集群中使用),且不能是延时消息。 步骤:
- 构建消息对象集合
- 调用send(Collection msg)系列的方法
批量消息的限制为4M,需要计算集合中的消息是否超过限制,若是超过限制可以分割消息,再进行多次发送。
一般批量发送(不分割消息)
public static void main(String[] args) throws Exception {
//实例化消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("group_luke");
//设置NameSever地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
String topic = "topic_luke";
//需要确保消息的大小在4M以内
List messageList = new ArrayList<>();
messageList.add(new Message(topic,"tag","keys01","消息体01".getBytes(StandardCharsets.UTF_8)));
messageList.add(new Message(topic,"tag","keys02","消息体02".getBytes(StandardCharsets.UTF_8)));
messageList.add(new Message(topic,"tag","keys03","消息体03".getBytes(StandardCharsets.UTF_8)));
producer.send(messageList);
//关闭producer
producer.shutdown();
}
批量切分发送
如果消息的总长度可能大于 4MB 时,这时候最好把消息进行分割,我们举例按照1M的大小进行分割。
这里发送10万条消息,并对这些消息进行分割,每次1M左右的消息。
生产者
public static void main(String[] args) throws Exception {
//实例化消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("group_luke");
//设置NameSever地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
String topic = "topic_luke";
List messageList = new ArrayList<>();
for (int i = 0; i < 1000 * 100; i++) {
messageList.add(new Message(topic,"tag","keys"+i,("消息体"+i).getBytes(StandardCharsets.UTF_8)));
}
//分割
ListSplitter splitter = new ListSplitter(messageList);
while (splitter.hasNext()){
List listItem = splitter.next();
producer.send(listItem);
}
//关闭producer
producer.shutdown();
}
ListSplitter类
public class ListSplitter implements Iterator> {
private final int SIZE_LIMIT = 1000 * 1000;
private final List messages;
private int currIndex;
public ListSplitter(List messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
//计算topic和Body的大小
int tmpSize = message.getTopic().length() + message.getBody().length;
//计算properties的大小
Map properties = message.getProperties();
for (Map.Entry entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //给日志的开销留20字节
//单个消息的大小已经超过了1M
if (tmpSize > SIZE_LIMIT) {
//若是下个子列表没有元素,则添加这个子列表
if (nextIndex - currIndex == 0) {
nextIndex++;
}
//退出循环
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}