=================
== Time Stream ==
=================
一个小学生

RocketMQ中的消息发送

RocketMQ

RocketMQ的消息发送过程学习。

RocketMQ发送消息有三种方式:同步发送、异步发送、单向发送。

  • 同步发送,Producer发送消息,同步等待,直到服务器返回发送结果
  • 异步发送,Producer发送消息,指定回调方法,发送消息后立刻返回。回调任务会在新线程中执行。
  • 单向发送,Producer发送消息,直接返回,不等待返回结果也不注册回调方法。

消息组成

  • topic,消息所属的Topic
  • flag,消息的Flag
  • properties,消息扩展属性
  • 消息体

消息扩展属性

消息扩展属性包括:

  • tag,消息tag,用于过滤消息
  • keys,消息索引键,多个用空格隔开,可以使用这些key快速检索到消息
  • waitStoreMsgOk,消息发送时是否等待消息存储完成后再返回
  • delayTimeLevel,消息延迟级别,用于定时消息或消息重试

消息发送流程

  • 验证消息
  • 查找路由
  • 消息发送
  • 异常处理

验证消息

  • 确保生产者处于运行状态
  • 验证消息符合规范,topic名称、消息体不能为空、消息长度不能等于0,且不能超过4M

查找路由

消息发送前会先根据Topic获取路由信息,找到要发送的Broker信息。会先从缓存中查找路由信息,如果缓存中不存在,则从NameServer中查询Topic对应的路由信息。如果最后还是找不到,抛异常。

根据Topic获取到路由信息后,会从路由信息中选择一个要发送的消息队列,选择消息队列分两种情况:未开启故障规避机制和开启了故障规避机制

选择发送的消息队列时采用的算法就是轮询。未开启故障规避的话,如果选择的队列所在Broker不可用,这时候轮询到下一个队列,可能还会在同一个Broker上,有可能队列还是不能用。如果开启故障规避的话,如果选择的当前队列所在Broker不可用,在轮询找下个对列的时候,会把这个Broker直接规避掉,从其他的Broker上的队列选取一个。

消息发送

  • 选择好队列后,从队列中获取到Broker的地址
  • 为消息分配全局唯一ID
  • 入股消息体超过4K,会采用zip压缩,并将消息系统标记记位COMPRESSED_FLAG
  • 如果是事务Prepared消息,将消息系统标记记为TRANSACTION_PREPARED_TYPE
  • 如果注册了消息发送钩子方法,消息发送前会先执行
  • 构建消息发送请求包,包括:生产者组、topic名称、默认创建topic的key、topic在单个broker默认队列数、队列ID、消息系统标记、发送时间、消息标记、扩展属性、消息重试次数、是否批量消息等
  • 根据发送方式,同步、异步、单向等进行网络传输
  • 如果注册了消息发送钩子方法,执行发送后的逻辑