0%

生产者发送消息过程

  1. 生产者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)

  2. 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等

  3. 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等

  4. 生产者通过路由键将交换器和队列绑定起来

  5. 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息

  6. 相应的交换器根据收到的路由键查找相匹配的队列

  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中

  8. 如果没有找到,则根据生产者匹配的属性选择丢弃还是回退给生产者

  9. 关闭信道

  10. 关闭连接

消费者接收消息过程

  1. 消费者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)

  2. 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作

  3. 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息

  4. 消费者确认(ack)接收到的消息

  5. RabbitMQ 从队列中删除相应已经被确认的消息

  6. 关闭信道

  7. 关闭连接

如下图所示,我们又引入了两个新的概念:Connection 和 Channel。我们知道无论是生产者和消费者,都需要和 RabbitMQ Broker 建立连接, 这个连接就是一条 TCP 连接,也就是 Connection。一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道 都会被指派一个唯一的 ID。信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

7

我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢? 试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息, 那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销, 如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ 采用类型 NIO(Non-blocking I/O)的做法, 选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。 同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。 当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。 但是当信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。 此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中。

信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。 可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比邮局、邮箱和邮递员组成的一个系统。 从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

rabbitmq_structure

生产者和消费者

  • Producer:生产者,就是投递消息的一方。

生产者创建消息,然后发布到 RabbitMQ 中。消息一般可以包含 2 个部分:消息体和标签(Label)。 消息体也可以称之为 payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。 当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。 生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者(Consumer)。

  • Consumer:消费者,就是接收消息的一方。

消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。 在消息路由的过程中,消息的标签会丢弃,存入队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道 消息的生产者是谁,当然消费者也不需要知道。

  • Broker:消息中间件的服务节点。

对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。 大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

下图展示了生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整个流程:

rabbitmq1

首先生产者将业务方数据进行可能的包装,之后封装成消息,发送(AMQP 协议里这个动作对应的命令为 Basic.Publish)到 Broker 中。 消费者订阅并接收消息(AMQP 协议里这个动作对应的命令为 Basic.Consume 或者 Basic.Get),经过可能的解包处理得到原始的数据, 之后再进行业务处理逻辑。这个业务处理逻辑并不一定需要和接收消息的逻辑使用同一个线程。 消费者进程可以使用一个线程去接收消息,存入到内存中,比如使用 Java 中的 BlockingQueue。 业务处理逻辑使用另一个线程从内存中读取数据,这样可以将应用进一步解耦,提高整个应用的处理效率。

队列

Queue:队列,是 RabbitMQ 的内部对象,用于存储消息。

RabbitMQ 中消息都只能存储在队列中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题)这个逻辑层面,而相对应的逻辑只是 topic 实际存储文件中的位移标识。RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是 每个消费者都收到所有的消息并处理。

rabbitmq2

RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。

交换器、路由键、绑定

Exchange: 交换器。在上图中我们可以暂时理解成生产者将消息投递到队列中,实际上这个在 RabbitMQ 中不会发生。 真实情况是,生产者将消息发送到 Exchange(交换器,通常也可以用大写的 "X" 来表示),由交换器将消息路由到一个或者多个队列中。 如果路由不到,或许会返回给生产者,或许会直接丢弃。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。

3

RabbitMQ 中的交换器有四种类型,不同的类型有着不同的路由策略。

RoutingKey:路由键。生产者将消息发送给交换器的时候,一般会指定一个 RoutingKey,用来指定这个消息的路由规则, 而这个 Routing Key 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定 RoutingKey 来决定消息流向哪里。

Binding:绑定。RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey), 这样 RabbitMQ 就知道如何正确地将消息路由到队列了。

4

生产者将消息发送给交换器时,需要一个 RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应地队列中。 在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效, 它依赖于交换器类型,比如 fanout 类型的交换器就会无视 BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

交换器相当于投递包裹的邮箱,RoutingKey 相当于写在包裹上的地址,BindingKey 相当于包裹的目的地,当填写在包裹上的地址和 实际想要投递的地址相匹配时,那么这个包裹就会被正确地投递到目的地,最后这个目的地地 “主人” -- 队列可以保留这个包裹。 如果填写的地址出错,邮递员不能正确地投递到目的地,包裹可能会回退给寄件人,也有可能被丢弃。

在某些情形下,RoutingKey 和 BindingKey 可以看作同一个东西。

RoutingKey 和 BindingKey

1
2
3
4
5
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

以上代码声明了一个 direct 类型的交换器,然后将交换器和队列绑定起来。注意这里使用的字样是 "ROUTING_KEY",在本该使用 BindingKey 的 channel.queueBind 方法中却和 channel.basicPublish 方法同样使用了 RoutingKey,这样做的潜台词是: 这里的 RoutingKey 和 BindingKey 是同一个东西。在 direct 交换器类型下,RoutingKey 和 BindingKey 需要完全匹配才能使用, 所以上面的代码中采用了这种写法会显得更方便。

但是在 topic 交换器类型下,RoutingKey 和 BindingKey 之间需要做模糊匹配,两者并不是相同的。

BindingKey 其实也属于路由键中的一种,官方解释为:the routing key to use for the binding。 可以翻译为:在绑定的时候使用的路由键。大多数时候,包括官方文档和 RabbitMQ Java API 中都把 BindingKey 和 RoutingKey 看做 RoutingKey,为了避免混淆,我们可以这么理解:

  • 在使用绑定的时候,其中需要的路由键是 BindingKey。涉及的客户端方法如: channel.exchangeBind、channel.QueueBind,对应的 AMQP 命令为 Exchange.Bind、Queue.Bind

  • 在发送消息的时候,其中需要的路由键是 RoutingKey。涉及的客户端方法如 channel.basicPublish,对应的 AMQP 命令为 Basic.Publish。

由于某些历史的原因,大多数情况下习惯性地将 BindingKey 写成 RoutingKey,尤其是在使用 direct 类型地交换器地时候。

交换器类型

RabbitMQ 常用的交换器类型有 fanout、direct、topic、headers 这四种。

fanout

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

direct

direct 类型的交换器路由规则也很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。

以下图为例,交换器的类型为 direct,如果我们发送一条消息,并在发送消息的时候设置路由键为 "warning", 则消息会路由到 Queue1 和 Queue2,对应的示例代码如下:

1
2
3
channel.basicPublish(EXCHANGE_NAME, "warning",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes())
5

如果在发送消息的时候设置路由键为 "info" 或者 "debug",消息只会路由到 Queue2, 如果以其他的路由键发送消息,则消息不会路由到这两个队列中。

topic

前面讲到的 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配模式在很多情况下不能完全满足实际业务的需求。 topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中, 但这里的匹配规则有些不同,它约定:

  • RoutingKey 为一个点号 "." 分隔的字符串(被点号 "." 分隔开的每一段独立的字符串称为一个单词), 如 "com.rabbitmq.client"、"java.util.concurrent"、"com.hidden.client"。

  • BindingKey 和 RoutingKey 一样也是点号 "." 分隔的字符串。

  • BindingKey 中可以存在两种特殊字符串 "" 和 "#",用于做模糊匹配,其中 "" 用于匹配一个单词,"#" 用于匹配零个或多个单词。

以下图为例子:

  • 路由键为 "com.rabbitmq.client" 的消息会同时路由到 Queue1 和 Queue2

  • 路由键为 "com.hidden.client" 的消息只会路由到 Queue2 中

  • 路由键为 "com.hidden.demo" 的消息只会路由到 Queue2 中

  • 路由键为 "java.rabbitmq.demo" 的消息只会路由到 Queue1 中

  • 路由键为 "java.util.concurrent" 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。

6

headers

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。 在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式), 对比其中的键值对是否完全匹配队列和交换器绑定时的键值对,如果完全匹配则消息会路由到该队列, 否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

什么是消息中间件

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串、JSON 等,也可以很复杂,比如内嵌对象。

消息队列中间件(Message Queue Middleware,简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流, 并基于数据通信来进行分布式系统集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

消息队列中间件,也可以成为消息队列或者消息中间件。它一般有两种传递模式:点对点模式和发布/订阅模式。 点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。 发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介, 消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立, 不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。

面向消息的中间件(简称为 MOM,Message Oriented Middleware)提供了以松散耦合的灵活方式集成应用程序的一种机制。 它们提供了基于存储和转发的应用程序之间的异步数据传送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。 消息中间件提供了有保证的消息发送,应用程序开发人员无需了解远程过程调用(RPC)和网络通信协议的细节。

消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件的系统中,不同的对象之间通过传递消息来激活对方的事件,以完成相应的操作。 发送者将消息发送给消息服务器,消息服务器将消息放在若干队列中,在合适的时候再将消息转发给接收者。 消息中间件能在不同平台之间通信,它常被用来屏蔽各种平台及协议之间的特性,实现应用程序之间的协同, 其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发, 这也是它远比远程过程调用更进步的原因。

消息中间件的作用

消息中间件凭借其独到的特性,在不同的应用场景下可以展现不同的作用。总的来说,消息中间件的作用可以概括如下:

  • 解耦:在项目启动之初来预测将来会碰到什么需求是及其困难的。消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层, 两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可。

  • 冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理, 通过这一方式规避了数据丢失风险。在把一个消息从消息中间件删除之前,需要你的处理系统明确地指出该消息已经被处理完成, 从而确保你的数据被安全地保存直到你使用完毕。

  • 扩展性:因为消息中间件解耦了应用地处理过程,所以提高消息入队和处理地效率是很容易的, 只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。

  • 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源, 无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。

  • 可恢复性:当系统一部分组件失效时,不会影响整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉, 加入消息中间件中的消息仍然可以在系统恢复后进行处理。

  • 顺序保证:在大多数使用场景下,数据处理的顺序是很重要的,大部分消息中间件支持一定程度上的顺序性。

  • 缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件 地处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。

  • 异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中, 但并不立即处理它,在之后需要的时候再慢慢处理。

RabbitMQ 的特点

  • 可靠性:RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认及发布确认等。

  • 灵活的路由:在消息进入队列之前,通过交换器路由消息。对于典型的路由功能,RabbitMQ 已经提供了一些内置的交换器来实现。 针对更复杂的路由功能,可以将多个交换器绑定到一起,也可以通过插件机制来实现自己的交换器。

  • 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。

  • 高可用性:队列可以在集群地机器上设置镜像,也可以根据实际业务情况动态地扩展集群中节点。

  • 多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。

  • 多种语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。

  • 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。

  • 插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

参考链接 : https://medium.com/@hariomvashisth/cors-on-nginx-be38dd0e19df

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
server {
listen 80;
server_name api.test.com;


location / {

# Simple requests
if ($request_method ~* "(GET|POST)") {
add_header "Access-Control-Allow-Origin" *;
}

# Preflighted requests
if ($request_method = OPTIONS ) {
add_header "Access-Control-Allow-Origin" *;
add_header "Access-Control-Allow-Methods" "GET, POST, OPTIONS, HEAD";
add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, Content-Type, Accept";
return 200;
}

....
# Handle request
....
}
}

方法: 使用 docker [container] exec,container 可以省略,可以直接使用 docker exec

在使用 docker 容器的时候,我们总会想看看容器内部长什么样子: 我们可以使用 docker exec 命令。

现在,我们想通过 nginx -s reload 来重启 nginx 容器,可以使用下面的命令:

script
1
docker exec nginx nginx -s reload

上面第一个 nginx 是容器名称,nginx -s reload 是进入到容器后运行的命令。

如果我们想以交互的方式进入容器(类似 docker run):

script
1
docker exec -it centos bash