QMQ的设计与实现

去哪儿网自己造的轮子有什么特殊之处

Posted by ALID on January 9, 2020

完全可靠消息, 大量组订阅通知, 方便纵向扩容, 支持任意时间的延迟消息, 但并不能保证消息的顺序性

序章-QMQ的诞生

为啥造轮子?

在遥远的8年前, RocketMQ还叫MetaQ; kafka还很不稳定; RabbitMQ是使用erlang开发的, 不好定制化. 所以当时的大佬们就决定自己搞一个简单易用并且符合业务要求的轮子.

怎么实现一个MQ呢?

想要实现一个最简单的MQ功能需要以下两点

  1. 需要在服务间传递消息
  2. 需要在服务端保存消息

当时实现方式也非常简单, 消息的发送就是简单对Dubbo进行了一个封装; 而保存消息就直接保存在了数据库上. 下图就很好的展示了最初级的QMQ设计.

img

还需要什么?

首先去哪儿网提供了旅游产品在线预订服务,那么就涉及电商交易,在电商交易中我们认为数据的一致性是非常关键的要素。那么我们的 MQ 必须提供一致性保证。

这样就让QMQ有了独属于自己的特性, 完全消息可靠.

随着 MQ 在各系统中的大量应用,就不仅限于交易场景了,大家都期望所有场景中只使用一套MQ,所以后来消息量迅速增长,迫使我们对存储模型进行了重新设计。

进而设计出了QMQ独特的存储模型

另外旅游产品预订的特征,大部分预订都是未来某个时间点的,这个时间可长可短,短的话可能是几个小时,长的话可能是半年以上,那么我们对延时消息的需求也很强烈,这种延时时间不固定的方式也对服务端设计提出了挑战。

所以QMQ就支持了任意时间的延迟消息

QMQ的实现

img

内部QMQ版本是依赖ZK的, 开源版本设计了Meta server来作为注册中心, 流程如下:

  1. delay server 向meta server注册
  2. 实时server 向meta server注册
  3. producer在发送消息前需要询问meta server获取server list
  4. meta server返回server list给producer(根据producer请求的消息类型返回不同的server list)
  5. producer发送延时/定时消息
  6. 延时时间已到,delay server将消息投递给实时server
  7. producer发送实时消息
  8. consumer需要拉取消息,在拉取之前向meta server获取server list(只会获取实时server的list)
  9. meta server返回server list给consumer
  10. consumer向实时server发起pull请求
  11. 实时server将消息返回给consumer

消息机制

QMQ设计了不同于主流框架的消息机制

img

QMQ并没有partition的设计, 可以理解为一个topic只有一个队列 consume log, 其中存储了 message log 中的偏移量. 这里的设计好像和RocketMQ差不多, 之后就不一样了. 这里为了让一个 consume log 可以被多个consumer消费, 设计了 pull log 的概念, 消息被consumer拉取后, 就会记录在其中. 实际记录的其实是 consume log 的偏移量. 这样在RocketMQ中必须等到ACK才能向前移动的offset, 在QMQ中只要消息被拉取就可以向前移动. 之后ACK校验已经重试遍历 pull log 即可. 当然这种这样也无法保证消息的顺序性.

而且使用partition和consumer的对应关系如果单partition积压消息, 很难通过纵向扩容的方法来消费. 但是QMQ的设计就可以让新机器立刻消费挤压请求. 因为没有partition和consumer的对应关系, 谁都可以在consume log上消费. pull log 可以标记其消费记录.

还有多个group消费的时候, 每个组的consumer数量不同, 如果依赖partition和consumer的对应关系就会很难去维护的很好. 而这样的设计大家都用一份 consume log 每个组都可以随意加减机器. 可以让一个topic被大量组订阅. 比如我们酒店的订单变更消息就有接近70个不同的消费组订阅(可以将消费组理解为不同的应用),整个交易流程都是靠消息来驱动,那么如果使用partition和consumer的对应关系,要在70个不同应用之间协调partition和Consumer的均衡几乎是不可能的。

延迟消息

QMQ提供任意时间的延时/定时消息,你可以指定消息在未来两年内(可配置)任意时间内投递。比起RocketMQ提供的多个不同延时level的延时消息,QMQ的延时消息更加灵活。比如在OTA场景中,客人经常是预订未来某个时刻的酒店或者机票,这个时间是不固定的,我们无法使用几个固定的延时level来实现这个场景。

img

  1. 这里的实现方式, 首先还是使用了3个log. 其中message log收到消息就推送给schedule log. 这里因为仅仅起了一个中转的作用, 所以历史的记录都可以删除. 这里可以使用低容量高性能的SSD来实现.
  2. schedule log, 是延迟实现的第一步. 其每个小时一个文件, 过期的文件也可以删除. 该文件对应了第一个hash wheel, 这个hash wheel和文件粒度一样, 一个小时一个刻度. 当消息投递时间即将到来的时候, 会将其加载到内存中.
  3. 在内存中有第二个hash wheel, 也就是延迟实现的第二步. 该hash wheel的刻度是500ms.
  4. 最后当消息投递成功后, 会在第三个log也就是dispatch log中写入记录, 但不包含消息内容. 这里是为了在延时server中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递。

完全可靠

常有的MQ其实也基本实现了消息的可靠性. 但是在producer其实并不是完全可靠的, 因为如果producer端发出一个消息. 恰巧这个消息也丢失了, 正常情况下producer端未接收到这个消息的ACK就会重新发送. 但是如果producer端发出这个消息后就挂掉了, 这个消息就会丢失了.

producer

其实就是依赖DB来保证可靠性. 在生产者发消息的时候, 其实是先在本地数据库存一条记录, 成功后才会发送消息. 如果之后受到ack就把该记录删除, 并且后台有一个任务定时扫描该记录, 补偿重新发送消息.

1
2
3
4
5
6
@Transactional
public void pay(Order order){
    PayTransaction t = buildPayTransaction(order);
    payDao.append(t);
    producer.sendMessage(buildMessage(t));
}

这是一个发送消息的请求, 其真实实现的伪代码就是下面的样子

1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
public void pay(Order order){
    PayTransaction t = buildPayTransaction(order);
    payDao.append(t);
    //producer.sendMessage(buildMessage(t));
    final Message message = buildMessage(t);
    messageDao.insert(message);
    //在事务提交后执行
    triggerAfterTransactionCommit(()->{
        messageClient.send(message);
        messageDao.delete(message);
    });
}

这样设计就保证了在交易系统里强一致性的要求.

broker

首先也采用了集群的设计, 每个集群分为master和slave, 共同组成了一个group. 但是和大多数集群不同的是, 这里只有master负责消息的发送和消费, slave只负责保证消息的可用性.

当消息发送给master后,slave会从master同步消息,只有消息同步到slave后master才会返回成功的响应给producer,这就保证了master和slave上都有一致的消息。当master和slave之间的延迟增大时,会标记该group为readonly状态,这个时候将不再接收消息,只提供消息消费服务。

目前当master离线后,不提供自动切换功能,需要人工启动master。当slave离线后,该group不再提供接收消息服务,只提供消息消费服务。当master出现故障,导致消息丢失时,可以将其切换为slave,原来的slave切换为master,slave将从master同步数据,同步完成后提供服务。

其实这里的设计有点类似于一个备份和后备的策略, slave只用来作为备份和后备. 并不起到提供服务的功能, 类似于备份出问题的时候不在接受新的数据.

Consumer

消息拉取 这里采用的是通知机制, 在接到通知后会唤醒consumer的线程, 使用netty去broker拉取消息内容.

对于ACK其实是批量返回的, 可以在等到一定消息量再返回, 也可以得到超时后返回ACKList.

这里使用拉模式, 是因为推模型在消息堆积时会增加消费者的压力。

异常重试 为了保证消息的可靠性, 如果消费方法抛出异常, 那默认5s之后会再次尝试调用. 如果是正常的异常捕获就不会再次尝试调用了. 我们也可以用这个功能做限流, 暂时不能接受的请求只要原地抛异常就行, 之后qmq会自动重试.

广播请求 广播请求, 比如在我们希望该系统的所有机器上更新缓存, 就可以通过QMQ广播发送通知.

幂等请求 很多时候我们其实希望, 相同请求无论发送几次消费者只消费一次的请求. 但因为对于可靠性的保证, 很多时候都可能重新发送请求. 就会导致接受请求的重复.

  • 实际已经消费消息, 但生产者没有接收到Response
  • 实际已经消费消息, 但ACK丢失.

对于只消费一次的实现, 其实还是可能会发送多次请求. 这里还是使用了DB记录已接受请求, 之后每次接受到请求对比判断是否已经结束过即可.

并且也可以自己实现方法的幂等性, 或者采用加锁的方式来拒绝相同的请求.

消费线程 在被消费之前其实是有一个默认的线程池的, 可以通过配置修改参数.

终章-QMQ的缺陷

QMQ的设计其实就是在取舍中前行.

所有消费者消费一个 message log, 解决了单点故障引起的消息积压, 可以让大量消费组订阅, 提高了横向扩展的能力. 但也无法保证消息的顺序性.

通过DB实现了消息的完全可靠, 保证了交易场景下的消息可靠性. 但也极大的减少了吞吐量.

通过时间轮实现任意时间的延迟消息, 保证了各种预定场景的功能. 但需要保存将近2w个文件来存储未来的消息.