《RabbitMQ实战指南》学习笔记(一)

又开始水RabbitMQ啦。虽然在DDD中闻说,对于服务间的解耦而言,采用消息中间件是最好的方式,但是苦于实际工作中没有遇到,这一块一直很生疏。
学习一下RabbitMQ确有必要,不啰嗦了,开始吧。

本笔记对应于该书的第一章 RabbitMQ简介
RabbitMQ 是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量地使用。RabbitMQ凭借其高可靠、易扩展、高可用及丰富的功能特性受到越来越多企业的青睐。作为一个合格的开发者,有必要深入地了解RabbitMQ的相关知识,为自己的职业生涯添砖加瓦。

1.1 什么是消息中间件

  • 消息(message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串、JSON等,也可以很复杂,比如内嵌对象。
  • 消息队列中间件(Message Queue Middleware,简称MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
  • 消息队列中间件,也可以称为消息队列或者消息中间件。它一般有两种传递模式:点对点(p2p,point-to-point)模式和发布/订阅(pub/sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。发布订阅模式定义了如何想一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
  • 目前开源的消息中间件有很多,比较主流的有 RabbitMQ、Kafka、ActiveMQ、RocketMQ等。面向消息的中间件(简称为MOM,Message Oriented Middleware)提供了以松散耦合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序之间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。消息中间件提供了保证的消息发送,应用程序开发人员无须了解远程过程调用(RPC)和网络通信协议的细节。
  • 消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件的系统中,不同对象之间通过传递消息来激活对方的时间,以完成相应的操作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候在将消息转发给接受者。消息中间件能在不同平台之间通信,它常被用来屏蔽各种平台及协议之间的特性,实现应用程序之间的协同,其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发,这也是它比远程过程调用更进步的原因。

1.2 消息中间件的作用

消息中间见凭借其独到的特性,在不同的应用场景下可以展现不同的作用。总的来说,消息中间件的作用可以概括如下:

  • 解耦 在项目启动之初来预测将来会碰到什么需求是及其困难的。消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束即可。
  • 冗余(存储) 有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到他们被完全处理,通过这一方法规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出该消息已经被处理完成,从而确保你的数据被安全的保存直到你使用完毕。
  • 扩展性 因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要修改代码,也不需要调节参数。
  • 削峰 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
  • 可恢复性 当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
  • 顺序保证 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
  • 缓冲 在任何重要的系统中,都会存在需要不同处理事假的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
  • 异步通信 在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用吧一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候在慢慢处理。

1.3 RabbitMQ的起源

RabbitMQ是采用Erlang 语言实现AMQP(advance message queuing protocal, 高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

  • 早期消息中间件的商业实现,微软 MSMQ(microsoft message queue)、IBM的websphere等。由于高昂的价格,一般只用于大型组织机构,它们需要可靠性、解耦及实时消息通信的功能。由于商业壁垒,商业MQ供应商想要解决应用互动的问题,而不是创建标准来实现不同的MQ产品间的互通,或者允许应用程序更改MQ平台。
  • 为了打破这个壁垒,同时为了能够让消息在各个消息队列平台间互融互通,JMS(java message service)应运而生。JMS试图提供公共java api的方式,隐藏单独MQ产品供应商提供的实际接口,从而跨越了壁垒,以及解决了互通问题。从技术来讲,java应用程序只需要针对JMS api编程,选择合适的MQ驱动即可,JMS会打理好其他部分。ActiveMQ就是JMS的一种实现。不过尝试使用单独标准化接口来胶合众多不同的接口,最终会暴露出问题,使得应用程序变得更加脆弱,所以急需一种新的消息通信标准化方案。
  • 2006年6月,Cisco、Redhat、iMatix等联合制定了AMQP的公开标准,由此AMQP登上了历史舞台。它是应用层协议的一个开放标准,以解决众多消息中间件的需求和拓扑结构问题。它为面向消息的中间件设计,基于此协议的客户端和消息中间件可以传递消息,并不受产品、开发语言等条件的限制。
  • RabbitMQ最初版本实现了AMQP的一个关键特性:使用协议本身就可以对队列和交换器(exchange)这样的资源进行配置。对于商业MQ供应商来说,资源配置需要通过管理终端的特定工具才能完成。RabbitMQ的资源配置能力使其成为构建分布式应用最完美的通信总线,特别有助于充分利用基于云的资源和进行快速开发。
  • RabbitMQ是由RabbitMQ technologies ltd 开发并且提供商业支持的。取Rabbit这样一个名字,是因为兔子行动非常迅速且繁殖起来非常疯狂,RabbitMQ的开创者认为以此命名这个分布式软件再合适不过了。RabbiteMQ ltd 在2010年4月被SpringSource(VMWare的一个部门)收购,在2013年5月并入Pivotal,其实VMWare、Pivotal和EMC本质上是一家。 不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在没有上市。至今你也可以在rabbitemq的官网上的logo看到“by pivotal”的字样。

RabbitMQ发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ的具体特点可以概括为以下几点。

  • 可靠性 rabbitmq使用一些机制来保证可靠性,入持久化、传输确认以及发布确认
  • 灵活的路由 在消息进入队列之前,通过交换机来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的交换器实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  • 扩展性 多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态的扩展集群中节点。
  • 高可用性 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用
  • 多种协议 RabbitMQ除了原生支持AMQP协议,还支持STOMP、MQTT等多种消息中间件协议
  • 多语言客户端 RabbitMQ几乎支持所有常用语言,比如java、python、ruby、php、C#、js等
  • 管理界面 rabbitmq提供了一个易用的用户界面,使得用户可以监控和管理信息、集群中的节点等
  • 插件机制 Rabbitmq提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件

1.4 RabbitMQ的安装及简单使用

1.4.1 1.4.2 1.4.3 安装与运行

这里没有按照书上,先安装erlang,再安装rabbitmq。我这里直接用 docker 安装了,更方便一些。

1
2
3
$ sudo docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.8.2-management
$ sudo docker exec -it container_name bash
root@my-rabbit:/# rabbitmqctl status

1.4.4 生产和消费信息

先创建用户,并添加权限,设置root 用户为管理员角色。

1
2
3
root@my-rabbit:/# rabbitmqctl add_user root root
root@my-rabbit:/# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
root@my-rabbit:/# rabbitmqctl set_user_tags root administrator

rabbitemq 的 hello world。慢着,我觉得这本书,我看不下去了。因为他的案例是用java。。。。
还是用官网的 python 版吧。

  • 创建虚拟环境,我这里用pyenv

    1
    $pyenv virtualenv -p python3 rabbitmq_learn_env
  • 安装rabbitmq的python 客户端,Pika

    1
    $python -m pip install pika --upgrade
  • 发送 Sending

    • 建立链接
    1
    2
    3
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
  • 创建队列
1
channel.queue_declare(queue='hello')
  • 发送消息(采用默认exchange)
1
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
  • 关闭链接
1
connection.close()

这个时候,进入 rabbitmq容器,可以查看已有的队列

1
2
3
4
5
6
$ sudo docker exec -it wonderful_black bash
root@my-rabbit:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 1
  • 接收 Receiving
    • 声明队列,这个命令可以运行多次,但是只有第一次会创建
    1
    2
    channel = connection.channel()
    channel.queue_declare(queue='hello')
  • 编写回调函数
1
2
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
  • 设置消费者
1
channel.basic_consume(queue="hello", auto_ack=True, on_message_callback=callback)
  • 消费
1
2
3
4
5
6
channel.start_consuming()
[x] Received b'second hello world!'
[x] Received b'a second hello world!'
[x] Received b'a second hello'
[x] Received b'a second'
[x] Received b'a secon'

1.5 小结

本章首先针对消息中间件做了一个摘要性的介绍,包括什么是消息中间件、消息中间件的作用及消息中间件的特点。之后引入RabbitMQ,对其历史做一个简单的阐述,比如RabbitMQ具备哪些特点。本章后面的篇幅介绍了RabbitMQ的安装及简单使用,通过阐述生产者生产消息,以及消费者消费消息来给堵着一个对于RabbitMQ的最初印象,为后面的探索打下基础。