消息队列学习笔记

......

Posted by 呆贝斯 on May 10, 2021

MQ的概念

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器,用于分布式系统之间的通讯。

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的开放标准, 为面向消息的中间件设计。基于此协议的客户端,消息中间件可传递消息,并不受客户端、中间件不同产品, 不同开发语言的限制,2006年,AMQP规范发布,类比HTTP。

MQ的优劣势

优势

  1. 应用解耦 消息队列RabbitMQ版可用于单体应用被拆解为微服务后不同微服务间的通信。应用解耦的好处是不同应用的迭代不再相互依赖。 系统的耦合性越高,容错性就越低,可维护性也越低。使用MQ使得应用解耦,提高容错性和可维护性。 rabbitmq_1 rabbitmq_2
  2. 异步提速 异步通信的好处是数据不再需要被立即处理。异步解耦能有效缩短数据链路长度,提高数据处理效率。 rabbitmq_3
  3. 削峰填谷 大型活动带来较高流量脉冲时,没有做好相应保护容易导致系统超负荷甚至崩溃,限制太过则会导致请求大量失败而影响用户体验。 消息队列RabbitMQ版能做到削峰填谷。其高性能的消息处理能力可以承接流量脉冲而不被击垮,在确保系统可用性的同时, 通过快速有效的请求响应技术提升用户体验。其海量消息堆积能力确保下游业务在安全水位内平滑稳定的运行,避免流量高峰的冲击。 rabbitmq_4
  4. 分布式缓存同步 大量并发访问数据库会导致页面响应时间长。通过MQ构建分布式缓存,支持实时通知数据变化,有效降低页面响应时间,满足对变更的大量访问需求。

劣势

  1. 系统可用性降低 系统引入的外部依赖越多,系统的可用性越差,一旦MQ宕机就会影响业务。
  2. 一致性问题 A系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?
  3. 系统复杂性提高 MQ的加入大大增加了系统的复杂度,以前的系统间同步是远程调用,现在是通过MQ进行异步调用。

MQ的选型

  RabbitMQ RocketMQ Kafka
公司/社区 Rabbit 阿里 Apache
开发语言 Erlang Java Scala&Java
协议支持 AMQP、XMPP、SMTP、STOMP 自定义 自定义协议,社区封装了http协议支持
单机吞吐量 万级(其次) 十万级(最好) 十万级(次之)
消息延迟 微秒级 毫秒级 毫秒级
功能特性 并发能力强,性能极其好,社区活跃、管理界面丰富 MQ功能比较完备,扩展性佳 只支持主要的MQ功能,毕竟是为大数据领域准备
消息丢失 可能性很低 参数优化后可以零丢失 参数优化后可以零丢失
消费模式 推拉 推拉 拉取
主题数量对吞吐量影响 \ 几百上千个主题会对吞吐量有一个小影响 几十上百个主题会极大影响吞吐量
可用性 高(主从) 很高(主从) 很高(分布式)

RabbitMQ

RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。

  • 优点:
    1. 轻量级,快速,部署使用方便。
    2. 支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。
    3. RabbitMQ的客户端支持大多数的编程语言,支持AMQP协议。
  • 缺点:
    1. 如果有大量消息堆积在队列中,性能会急剧下降。
    2. 每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
    3. RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。

中小型公司,技术挑战不是特别高,用 RabbitMQ (开源、社区活跃)是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ(Java二次开发) 是很好的选择。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

RabbitMQ简介

RabbitMQ官方地址:www.rabbitmq.com/ 2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ 1.0 发布。RabbitMQ采用Erlang语言开发。Erlang 语言专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。 rabbitmq_6

消息在消息队列RabbitMQ版的组件之间流动的过程如下: ① 生产者向Exchange发送消息。 ② Exchange根据消息属性将消息路由到Queue进行存储。 ③ 消费者从Queue拉取消息进行消费。 rabbitmq_7

RabbitMQ相关术语

  • Broker:接受和分发消息的应用,RabbitMQ Server就是Message Brocker。
  • Vitual Host:出于多用户和安全因素设计,把AMQP的基本组件划分到一个虚拟分组中,类似于网络中namespace的概念。当多个不同的用户使用一个RabbitMQ Server提供服务时,可以划出多个vhost,每个用户在自己的vhost中创建exchange、queue等。
  • Connection:消费者或生产者跟broker之间的连接。
  • Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP连接的开销是巨大的,通常每个线程创立单独的Channel进行通讯,AMQP的method包含了channel id,帮助客户端和Message Broker识别channel,所以channel之间是全隔离的,channel作为轻量级的Connection极大地减少了操作系统创立TCP连接的开销。
  • Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中,常用的类型有:direct(point-to-point)、topic and fanout。
  • Queue:消息最终被推送到这里等待消费者取走。
  • Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

RabbitMQ的安装

最简安装

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

集群模式

RabbitMQ是用Erlang开发的,集群非常方便,因为Erlang天生就是一门分布式语言,但其本身并不支持负载均衡。

RabbitMQ集群中节点包括内存节点(RAM),磁盘节点(Disk,消息持久化),集群中至少有一个Disk节点。

  1. 普通模式(默认) 对于普通模式,集群中各节点有相同的队列结构,但信息只会存在于集群中的一个接待你。对于消费者来说,若消息进入A节点的队列中,当从B节点拉取时,RabbitMQ会将消息从A中取出,并经过B发送给消费者。

应用场景:该模式适合于消息无需持久化的场合,如日志队列。当队列非持久化,且创建该队列的节点宕机,客户端才可以重连集群其他节点,并重新创建队列。若为持久化,只能等故障节点恢复。 rabbitmq_8

  1. 镜像模式(高可用) 与普通模式不同之处是消息实体会主动在镜像节点间同步,而不是取数据时临时拉取,高可用。该模式下,mirror queue有一套选举算法,即一个master,n个slaver,生产者、消费者的请求都会转至master。

应用场景:可靠性要求较高场合,如下单、库存队列。

缺点:若镜像队列过多,且消息体量大,集群内部网络带宽将会被此种同步通讯所消耗。

镜像集群也是基于普通集群,即只有先搭建普通集群,然后才能设置镜像队列。若消费过程中,master挂掉、则选举新的master,若未来得及确认,则可能会重复消费。 rabbitmq_9

RabbitMQ的工作模式

  1. Work/Queue 任务队列的主要思想是避免立即执行资源密集型任务而不得不等待它完成。使用任务队列的优点之一是能够轻松并行工作。如果我们正在积压工作,我们可以添加更多的工作人员,这样就可以轻松扩展。默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。 rabbitmq_10
  2. Publish/Subscribe 将收到的消息发送到绑定该交换机的所有队列。 rabbitmq_11
  3. Routing 一个队列可以使用多个路由键,相同的路由键可以绑定到多个队列。路由键一般都是有一个或多个单词组成,多个单词之间以“.”分隔,例如item.insert rabbitmq_12
  4. Topic Topic类型与Direct相比,都是根据路由键把消息发送到不同的队列,只是Topic类型Exchange可是让队列绑定路由键的时候使用通配符。 通配符规则:#匹配一个或多个词,之匹配一个词,例如:item.#能够匹配item.insert.abc或者item.insert,item.只能匹配item.insert rabbitmq_13
  5. RPC
    1. 当客户端启动时,它会创建一个匿名独占回调队列。
    2. 对于RPC请求,客户端会发送具有两个属性的消息(reply_to:设置回调队列,correlation_id:设置每一个请求的唯一值)
    3. 请求发送到rpc_queue
    4. RPC工作者(服务器)正在等待该队列上的请求,当一个请求出现时,它会完成工作并使用来之reply_to字段的队列发送结果消息给客户端。
    5. 客户端等待回调队列中的数据。当出现一条消息时,它会检查correlation_id属性。如果它与请求中的值匹配,则将响应返回给应用程序。 rabbitmq_14

RabbitMQ的高级特性

延迟队列

优先级

RabbitMQ常见问题解决方案

消息有序

针对保证消息的有序性问题,解决办法就是保证生产者入队的顺序是有序的,出队后的顺序消费则交给消费者去保证。

方法一:拆分queue,使得一个queue只对应一个消费者。由于MQ一般都能保证内部队列是先进先出的,所以把需要保持先后顺序的一组消息使用某种算法都分配到同一个消息队列中。然后只用一个消费者单线程去消费该队列,这样就能保证消费者是按照顺序进行消费的了。但是消费者的吞吐量会出现瓶颈。如果多个消费者同时消费一个队列,还是可能会出现顺序错乱的情况,这就相当于是多线程消费了。

方法二:对于多线程的消费同一个队列的情况,可以使用重试机制:比如有一个微博业务场景的操作,发微博、写评论、删除微博,这三个异步操作。如果一个消费者先执行了写评论的操作,但是这时微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行发微博的操作后,再执行,就可以成功。 rabbitmq_15

消息丢失

出现原因:① 消息发出后,中途网络故障,服务器没收到。② 消息发出后,服务器收到了,还没持久化,服务器就宕机了。 ③ 消费发出后,服务器收到了,消费方还未处理业务逻辑,消费者挂掉了,而消息也自动签收了,等于没收到。

解决方案:① 发送确认 ② 消息持久化 ③ 手动签收 当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久的。首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为耐用的:

channel.queue_declare(真真正能= ‘你好’持久,==) 尽管此命令本身是正确的,但它在我们的设置中不起作用。那是因为我们已经定义了一个名为hello的队列 ,它不是持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue:

channel.queue_declare(queue= ‘task_queue’,durable= True ) 此queue_declare更改需要同时应用于生产者和消费者代码。

到那时,我们可以确定即使 RabbitMQ 重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的 - 通过提供具有pika.spec.PERSISTENT_DELIVERY_MODE值的delivery_mode属性

完成一项任务可能需要几秒钟。您可能想知道如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。使用我们当前的代码, 一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果你杀死一个工人, 我们将丢失它刚刚处理的消息。我们还将丢失所有发送给该特定工作人员但尚未处理的消息。但是我们不想丢失任何任务。 如果一个工人死亡,我们希望将任务交付给另一个工人。为了确保消息永远不会丢失,RabbitMQ 支持 消息确认。 一个 ack(nowledgement) 由消费者发回,告诉 RabbitMQ 一个特定的消息已经被接收、处理并且 RabbitMQ 可以自由地删除它。 如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将理解消息未完全处理并将重新排队。 如果同时有其他消费者在线,它会迅速将其重新发送给另一个消费者。这样,即使工人偶尔死亡,您也可以确保不会丢失任何消息。 对消费者交付确认强制执行超时(默认为 30 分钟)。这有助于检测从不确认交付的错误(卡住)消费者。 您可以按照Delivery Acknowledgement Timeout中所述增加此超时。默认情况下,手动消息确认是打开的,一旦我们完成了一项任务, 就该删除这个标志并从工作人员那里发送一个适当的确认,也可以通过auto_ack=True 标志明确地关闭了它们。

消息重复

出现原因:消费者发出的确认并没有传到消费队列,导致消息队列不知道消息已被消费,就再将消息发送给其他消费者。

解决方案:任何消息队列产品不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你⾃⼰在业务端去重。保障消息的唯一性,不让消息的多次消费带来影响,也就是保证消息的幂等性(幂等性指一个操作执行任意多次所产生的影响与一次执行的影响相同)

消息积压

出现原因:消费者的消费速度比生产者的生产速度慢,消息消费失败反复重试,消费者消费过程中hang住或死掉。

解决方案:1.消费者的消费速度比生产者的生产速度慢:增加消费者数量、临时扩容、优化消费逻辑(单条单条处理优化批处理)2.消息消费失败反复重试:消费失败的消息执行一定次重试后存入数据库等待人工处理 3.消费者消费过程中hang住或死掉:修复消费者

死信

TTL是什么?

TTL的意思就是time to live 消息的存活时间。如果消息一直在队列当中没有被消费并且的话存在的时间已经超过了消息的存活时间,消息就会标称了“死信”(后续会讲到),后续就无法被消费了。设置TTL的话有两种方式:第一种单独消息来进行配置ttl,第二种是整个队列来进行配置ttl,第二种使用居多。TTL介绍完了,咱们来介绍一下死信队列。

什么是死信队列?

可以从概念上来搞清楚,“死信”顾名思义就是无法被消费到的消息。一般来说的话,producer将消息投递到了broker或者是queue里面了,再接着consumer从queue取出消息后进行消费。这个没问题吧,但是在某一些时候由于特殊的原因会导致到queue中的某一些消息没有办法被消费到,这样的消息如果没有经过后续的处理的话,就会变成了死信,有了死信也就有了死信队列。

RabbitMQ产生死信的原因

  • 消息被拒绝了,并且requeue为false
  • 消息的存活时间过期
  • 队列达到的最大的长度(队列已经满了,没有办法在添加数据到mq里)

RabbitMQ死信交换机

当消息成为死信之后,为了防止消息的丢失,会将这样无法处理到的消息发送到一个叫死信交换机里,后面再通过死信交换机绑定的路由键发送到相对应的死信队列。当消息在一个队列当中变成死信,如果也配置了死信队列,那么就会重新的publish到死信交换机中,私信交换机在把这些死信投递到队列上,就形成了死信队列。

cookie_session

生产环境使用需要注意问题

  1. 虚拟主机、用户、权限
  2. 监控和资源限制(监控、内存空间、磁盘空间、打开文件句柄限制、日志收集)
  3. 节点间和CLI工具身份认证
  4. 防火墙配置
  5. TLS
  6. 网络配置
  7. 聚类注意事项(簇的大小、节点时间同步)
  8. 应用注意事项
    • 连接管理
    • 连接丢失
    • 从连接失败中恢复
    • 过度使用频道
    • 轮询消费者
    • 错误处理
    • 并发
  9. 度量收集