又开始水RabbitMQ啦。虽然在DDD中闻说,对于服务间的解耦而言,采用消息中间件是最好的方式,但是苦于实际工作中没有遇到,这一块一直很生疏。
学习一下RabbitMQ确有必要,不啰嗦了,开始吧。
本笔记对应于该书的第一章 RabbitMQ简介
RabbitMQ 是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量地使用。RabbitMQ凭借其高可靠、易扩展、高可用及丰富的功能特性受到越来越多企业的青睐。作为一个合格的开发者,有必要深入地了解RabbitMQ的相关知识,为自己的职业生涯添砖加瓦。
1.1 什么是消息中间件
- 消息(message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串、JSON等,也可以很复杂,比如内嵌对象。
- 消息队列中间件(Message Queue Middleware,简称MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
- 消息队列中间件,也可以称为消息队列或者消息中间件。它一般有两种传递模式:点对点(p2p,point-to-point)模式和发布/订阅(pub/sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。发布订阅模式定义了如何想一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
- 目前开源的消息中间件有很多,比较主流的有 RabbitMQ、Kafka、ActiveMQ、RocketMQ等。面向消息的中间件(简称为MOM,Message Oriented Middleware)提供了以松散耦合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序之间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。消息中间件提供了保证的消息发送,应用程序开发人员无须了解远程过程调用(RPC)和网络通信协议的细节。
- 消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件的系统中,不同对象之间通过传递消息来激活对方的时间,以完成相应的操作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候在将消息转发给接受者。消息中间件能在不同平台之间通信,它常被用来屏蔽各种平台及协议之间的特性,实现应用程序之间的协同,其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发,这也是它比远程过程调用更进步的原因。
1.2 消息中间件的作用
消息中间见凭借其独到的特性,在不同的应用场景下可以展现不同的作用。总的来说,消息中间件的作用可以概括如下:
- 解耦 在项目启动之初来预测将来会碰到什么需求是及其困难的。消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束即可。
- 冗余(存储) 有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到他们被完全处理,通过这一方法规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出该消息已经被处理完成,从而确保你的数据被安全的保存直到你使用完毕。
- 扩展性 因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要修改代码,也不需要调节参数。
- 削峰 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
- 可恢复性 当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
- 顺序保证 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
- 缓冲 在任何重要的系统中,都会存在需要不同处理事假的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
- 异步通信 在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用吧一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候在慢慢处理。
1.3 RabbitMQ的起源
RabbitMQ是采用Erlang 语言实现AMQP(advance message queuing protocal, 高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
- 早期消息中间件的商业实现,微软 MSMQ(microsoft message queue)、IBM的websphere等。由于高昂的价格,一般只用于大型组织机构,它们需要可靠性、解耦及实时消息通信的功能。由于商业壁垒,商业MQ供应商想要解决应用互动的问题,而不是创建标准来实现不同的MQ产品间的互通,或者允许应用程序更改MQ平台。
- 为了打破这个壁垒,同时为了能够让消息在各个消息队列平台间互融互通,JMS(java message service)应运而生。JMS试图提供公共java api的方式,隐藏单独MQ产品供应商提供的实际接口,从而跨越了壁垒,以及解决了互通问题。从技术来讲,java应用程序只需要针对JMS api编程,选择合适的MQ驱动即可,JMS会打理好其他部分。ActiveMQ就是JMS的一种实现。不过尝试使用单独标准化接口来胶合众多不同的接口,最终会暴露出问题,使得应用程序变得更加脆弱,所以急需一种新的消息通信标准化方案。
- 2006年6月,Cisco、Redhat、iMatix等联合制定了AMQP的公开标准,由此AMQP登上了历史舞台。它是应用层协议的一个开放标准,以解决众多消息中间件的需求和拓扑结构问题。它为面向消息的中间件设计,基于此协议的客户端和消息中间件可以传递消息,并不受产品、开发语言等条件的限制。
- RabbitMQ最初版本实现了AMQP的一个关键特性:使用协议本身就可以对队列和交换器(exchange)这样的资源进行配置。对于商业MQ供应商来说,资源配置需要通过管理终端的特定工具才能完成。RabbitMQ的资源配置能力使其成为构建分布式应用最完美的通信总线,特别有助于充分利用基于云的资源和进行快速开发。
- RabbitMQ是由RabbitMQ technologies ltd 开发并且提供商业支持的。取Rabbit这样一个名字,是因为兔子行动非常迅速且繁殖起来非常疯狂,RabbitMQ的开创者认为以此命名这个分布式软件再合适不过了。RabbiteMQ ltd 在2010年4月被SpringSource(VMWare的一个部门)收购,在2013年5月并入Pivotal,其实VMWare、Pivotal和EMC本质上是一家。 不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在没有上市。至今你也可以在rabbitemq的官网上的logo看到“by pivotal”的字样。
RabbitMQ发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ的具体特点可以概括为以下几点。
- 可靠性 rabbitmq使用一些机制来保证可靠性,入持久化、传输确认以及发布确认
- 灵活的路由 在消息进入队列之前,通过交换机来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的交换器实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
- 扩展性 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态的扩展集群中节点。
- 高可用性 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用
- 多种协议 RabbitMQ除了原生支持AMQP协议,还支持STOMP、MQTT等多种消息中间件协议
- 多语言客户端 RabbitMQ几乎支持所有常用语言,比如java、python、ruby、php、C#、js等
- 管理界面 rabbitmq提供了一个易用的用户界面,使得用户可以监控和管理信息、集群中的节点等
- 插件机制 Rabbitmq提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件
1.4 RabbitMQ的安装及简单使用
1.4.1 1.4.2 1.4.3 安装与运行
这里没有按照书上,先安装erlang,再安装rabbitmq。我这里直接用 docker 安装了,更方便一些。
1 | $ sudo docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.8.2-management |
1.4.4 生产和消费信息
先创建用户,并添加权限,设置root 用户为管理员角色。
1 | root@my-rabbit:/# rabbitmqctl add_user root root |
rabbitemq 的 hello world。慢着,我觉得这本书,我看不下去了。因为他的案例是用java。。。。
还是用官网的 python 版吧。
创建虚拟环境,我这里用pyenv
1
$pyenv virtualenv -p python3 rabbitmq_learn_env
安装rabbitmq的python 客户端,Pika
1
$python -m pip install pika --upgrade
发送 Sending
- 建立链接
1
2
3import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
- 创建队列
1 | channel.queue_declare(queue='hello') |
- 发送消息(采用默认exchange)
1 | channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') |
- 关闭链接
1 | connection.close() |
这个时候,进入 rabbitmq容器,可以查看已有的队列
1 | $ sudo docker exec -it wonderful_black bash |
- 接收 Receiving
- 声明队列,这个命令可以运行多次,但是只有第一次会创建
1
2channel = connection.channel()
channel.queue_declare(queue='hello')
- 编写回调函数
1 | def callback(ch, method, properties, body): |
- 设置消费者
1 | channel.basic_consume(queue="hello", auto_ack=True, on_message_callback=callback) |
- 消费
1 | channel.start_consuming() |
1.5 小结
本章首先针对消息中间件做了一个摘要性的介绍,包括什么是消息中间件、消息中间件的作用及消息中间件的特点。之后引入RabbitMQ,对其历史做一个简单的阐述,比如RabbitMQ具备哪些特点。本章后面的篇幅介绍了RabbitMQ的安装及简单使用,通过阐述生产者生产消息,以及消费者消费消息来给堵着一个对于RabbitMQ的最初印象,为后面的探索打下基础。