0%

面向对象各概念:

  • 类:用来描述具有相同的属性和方法的对象的集合。它定义了该集合中每个对象所共有的属性和方法。对象是类的实例。
  • 方法:类中定义的函数。
  • 类常量:类常量在整个实例化的对象中是公用的。类常量定义在类中且在函数体之外。类常量通常不作为实例变量使用。
  • 数据成员:类常量或者实例变量用于处理类及其实例对象的相关的数据。
  • 方法重写:如果从父类继承的方法不能满足子类的需求,可以对其进行改写,这个过程叫方法的覆盖(override),也称为方法的重写。
  • 局部变量:在类的声明中,属性是用变量来表示的,这种变量就称为实例变量,实例变量就是一个用 self 修饰的变量。
  • 继承:即一个派生类(derived class)继承基类(base class)的字段和方法。继承也允许把一个派生类的对象作为一个基类对象对待。 例如,有这样一个设计:一个 Dog 类型的对象派生自 Animal 类,这里模拟 "是一个(is-a)" 关系(例图,Dog 是一个 Animal)
  • 实例化:创建一个类的实例,类的具体对象。
  • 对象:通过类定义的数据结构实例。对象包括两个数据成员(类变量和实例变量)和方法。

类的继承机制允许多个基类,派生类可以覆盖基类中的任何方法,方法中可以调用基类中的同名方法。 对象可以包含任意数量和类型的数据。

类定义

1
2
3
4
5
6
class ClassName:
<statement-1>
.
.
.
<statement-N>

类实例化后,可以使用其属性,实际上,创建一个类之后,可以通过类名访问其属性。

类对象

类对象支持两种操作:属性引用和实例化。(这里的类对象是指类本身,而不是类实例化的对象)

属性引用使用和 Python 中所有的属性引用一样的标准语法:obj.name

类对象创建后,类命名空间中所有的命名都是有效属性名。所以如果类定义是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/python3

class MyClass:
"""一个简单的类实例"""
i = 12345
def f(self):
return 'hello world'

# 实例化类
x = MyClass()

# 访问类的属性和方法
print("MyClass 类的属性 i 为:", x.i)
print("MyClass 类的方法 f 输出为:", x.f())

以上创建了一个新的类实例并将该对象赋给局部变量 x,x 为空的对象。

执行以上程序输出结果为:

1
2
MyClass 类的属性 i 为: 12345
MyClass 类的方法 f 输出为: hello world

类有一个 __init__() 方法,类的实例化操作会自动调用 __init__() 方法。如下实例化类 MyClass,对应的 __init__() 方法就会被调用:

1
x = MyClass()

当然,__init__() 方法可以有参数,参数通过 __init__() 传递到类的实例化操作上。例如:

1
2
3
4
5
6
7
8
9
#!/usr/bin/python3

class Complex:
def __init__(self, realpart, imagpart):
self.r = realpart
self.i = imagpart

x = Complex(3.0, -4.5)
print(x.r, x.i) # 输出结果:3.0 -4.5

self 代表类的实例,而非类

类的方法与普通的函数只有一个特别的区别 -- 它们必须有一个额外的第一个参数名称,按照惯例它的名称是 self

1
2
3
4
5
6
7
class Test:
def prt(self):
print(self)
pring(self.__class__)

t = Test()
t.prt()

以上实例执行结果为:

1
2
<__main__.Test object at 0x1053bbb00>
<class '__main__.Test'>

从执行结果可以很明显的看出,self 代表的是类的实例,代表当前对象的地址,而 self.class 则指向类。 self 不是 python 关键字,我们把他换成其他变量名也可以:

1
2
3
4
5
6
7
class Test:
def prt(runoob):
print(runoob)
print(runoob.__class__)

t = Test()
t.prt()

以上实例执行结果为:

1
2
<__main__.Test object at 0x100a32588>
<class '__main__.Test'>

类的方法

在类的内部,使用 def 关键字来定义一个方法,与一般函数定义不同,类方法必须包含参数 self,且为第一个参数,self 代表的是类的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/python

# 类定义
class People:
# 定义类基本属性
name = ''
age = 0
# 定义私有属性,私有属性在外部无法直接进行访问
__weight = 0
# 定义构造方法
def __init__(self, n, a, w):
self.name = n
self.age = a
self.__weight = w
def speak(self):
print("{} 说: 我 {} 岁。".format(self.name, self.age))

# 实例化类
p = People('runoob', 10, 30)
p.speak()

执行以上程序输出结果为:

1
runoob 说: 我 10 岁。

继承

1
2
class DerivedClassName(BaseClassName):
pass

需要注意圆括号中基类的顺序,若是基类中有相同的方法名,而在子类使用时未指定,python 从左至右搜索,即方法在子类中未找到时,从左到右查找基类中是否包含方法。

BaseClassName(基类名) 必须与派生类定义在同一个作用域内。除了类,还可以用表达式,基类定义在另一个模块中时这一点非常有用:

1
class DerivedClassName(modname.BaseClassName):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/python3

# 类定义
class People:
# 定义基本属性
name = ''
age = 0
# 定义私有属性,私有属性在类外部无法直接进行访问
__weight = 0
# 定义构造方法
def __init__(self, n, a, w):
self.name = n
self.age = a
self.__weight = w
def speak(self):
print("{} 说:我 {} 岁。".format(self.name, self.age))

# 单继承示例
class Student(People):
grade = ''
def __init__(self, n, a, w, g):
# 调用父类的构造函数
People.__init__(self, n, a, w)
self.grade = g
# 覆写父类的方法
def speak(self):
print("{} 说:我 {} 岁了,我在读 {} 年级。".format(self.name, self.age, self.grade))

s = Student('ken', 10, 60, 3)
s.speak()

执行以上程序输出结果为:

1
ken 说:我 10 岁了,我在读 3 年级。

多继承

Python 同样支持多继承。多继承的类定义形式如下例:

1
2
class DrivedClassName(Base1, Base2, Base3):
pass

需要注意圆括号中父类的顺序,若是父类中有相同的方法名,而在子类使用时未指定,python 从左至右搜索,即方法在子类中未找到时,从左到右查找父类中是否包含方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/python3

# 类定义
class people:
# 定义基本属性
name = ''
age = 0
# 定义私有属性,私有属性在类外部无法直接进行访问
__weight = 0
# 定义构造方法
def __init__(self, n, a, w):
self.name = n
self.age = a
self.__weight = w
def speak(self):
print("{} 说:我 {} 岁。".format(self.name, self.age))

# 单继承示例
class student(people):
grade = ''
def __init__(self, n, a, w, g):
# 调用父类的构造函数
people.__init__(self, n, a, w)
self.grade = g
# 覆写父类的方法
def speak(self):
print("{} 说: 我 %d 岁了,我在读 {} 年级。".format(self.name, self.age, self.grade))

# 另一个类,多重继承之前的准备
class speaker():
topic = ''
name = ''
def __init__(self, n, t)
self.name = n
self.topic = t
def speak(self):
print("我叫 {},我是一个演说家,我演讲的主题是 {}".format(self.name, self.topic))

# 多重继承
class sample(speaker, student):
a = ''
def __init__(self, n, a, w, g, t):
student.__init__(self, n, a, w, g)
speaker.__init__(self, n, t)

test = sample('Tim', 25, 80, 4, "Python")
test.speak() # 方法同名,默认调用的是在括号中排前的父类的方法

执行以上程序输出结果为:

1
我叫 Tim,我是一个演说家,我演讲的主题是 Python

方法重写

如果你的父类方法的功能不能满足你的需求,你可以在子类重写你父类的方法。如:

1
2
3
4
5
6
7
8
9
10
11
class Parent:
def my_method(self):
print('调用父类方法')

class Child(Parent):
def my_method(self):
print('调用子类方法')

c = Child() # 子类实例
c.my_method() # 子类调用重写方法
super(Child, c).my_method() # 用子类对象调用父类已被覆盖的方法

super 函数是用于调用父类的一个方法。

执行以上程序输出结果为:

1
2
调用子类方法
调用父类方法

类属性与方法

类的私有属性

__private_attrs: 两个下划线开头,声明该属性为私有,不能在类的外部被使用或者直接访问。在类内部的方法中使用时 self.__private_attrs。

类的方法

在类的内部,使用 def 关键字来定义一个方法,与一般函数定义不同,类方法必须包含参数 self,且为第一个参数,self 代表的是类的实例。 self 的名字并不是规定死的,也可以使用 this,但是最好还是按照约定使用 self。

类的私有方法:

__private_method: 两个下划线开头,声明该方法为私有方法,只能在类的内部调用,不能在类的外部调用。self.__private_method。

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class JustCounter:
__secret_count = 0 # 私有变量
public_count = 0 # 公开变量

def count(self):
self.__secret_count += 1
self.public_count += 1
print(self.__secret_count)

counter = JustCounter()
counter.count()
counter.count()
print(counter.public_count)
print(counter.__secret_count) # 报错

以上实例输出结果为:

1
2
3
4
5
6
7
1
Traceback (most recent call last):
2
File "/Users/ruby/Code/DevAdmin/python-deployer/tests/oop/__init__.py", line 17, in <module>
print(counter.__secret_count) # 报错
AttributeError: 'JustCounter' object has no attribute '__secret_count'
2

类的私有方法实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Site:
def __init__(self, name, url):
self.name = name # 公开变量
self.__url = url # 私有变量

def who(self):
print('name:', self.name)
print('url:', self.__url)

def __foo(self): # 私有方法
print('这是私有方法')

def foo(self): # 公开方法
print('这是公共方法')
self.__foo()

x = Site('百度', 'https://www.baidu.com')
x.who() # 正常输出
x.foo() # 正常输出
x.__foo() # 报错

以上实例执行结果:

1
2
3
4
5
6
7
8
Traceback (most recent call last):
File "/Users/ruby/Code/DevAdmin/python-deployer/tests/oop/__init__.py", line 22, in <module>
x.__foo() # 报错
AttributeError: 'Site' object has no attribute '__foo'
name: 百度
url: https://www.baidu.com
这是公共方法
这是私有方法

类的专有方法

  • init: 构造函数,在生成对象时调用

  • del: 析构函数,释放对象时使用

  • repr: 打印,转换

  • setitem: 按照索引赋值

  • getitem: 按照索引获取值

  • len: 获得长度

  • cmp: 比较运算

  • call: 函数调用

  • add: 加运算

  • sub: 减运算

  • mul: 乘运算

  • truediv: 除运算

  • mod: 求余运算

  • pow: 乘方

运算符重载

Python 同样支持运算符重载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Vector:
def __init__(self, a, b):
self.a = a
self.b = b

def __str__(self):
return 'Vector ({}, {})'.format(self.a, self.b)

def __add__(self, other):
return Vector(self.a + other.a, self.b + other.b)

v1 = Vector(2, 10)
v2 = Vector(5, -2)
print(v1 + v2)

以上代码执行结果如下:

1
Vector (7, 8)

消息何去何从

mandatoryimmediatechannel.basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。 RabbitMQ 提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。

mandatory 参数

mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息 返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。

那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监听器实现。

使用 mandatory 参数的关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
channel.basicPublish(EXCHANGE_NAME, "", true,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String, replyText,
String exchange, String routingKey,
AMQP.BasicProperties basicProperties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return 返回的结果是:" + message);
}
});

测试的时候并没有如期输出。可能姿势不对

上面代码中生产者没有成功地将消息路由到队列,此时 RabbitMQ 会通过 Basic.Return 返回 "mandatory test" 这条消息, 之后生产者客户端通过 ReturnListener 监听到了这个事件,上面代码的最后输出应该是 "Basic.Return 返回的结果是: mandatory test"。

从 AMQP 协议层面来说,其对应的流转过程如下图所示:

12

immediate 参数

immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。 当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。

概括来说,mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递; 如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列等待消费者了。

RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是: immediate 参数会影响镜像队列的性能,增加代码复杂性,建议采用 TTL 和 DLX 的方法替代。

备份交换器

备份交换器,英文名称为 Alternate Exchange,简称 AE,或者更直白地称之为 "备胎交换器"。 生产者在发送消息的时候如果不设置 mandatory 参数,那么消息在未被路由的情况下将会丢失; 如果设置了 mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将变得复杂。 如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息。

可以通过在声明交换器(调用 channel.exchangeDeclare 方法)的时候添加 alternate-exchange 参数来实现,也可以通过策略的方式实现。 如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy 的设置。

使用备份交换器的关键代码如下:

1
2
3
4
5
6
7
8
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "myAe");
channel.exchangeDeclare("normalExchange", "direct", true, false, args);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "normalKey");
channel.queueDeclare("unroutedQueue", true, false, false, null);
channel.queueBind("unroutedQueue", "myAe", "");

上面的代码中声明了两个交换器 normalExchangemyAe,分别绑定了 normalQueueunroutedQueue 这两个队列, 同时将 myAe 设置为 normalExchange 的备份交换器。注意 myAe 的交换器类型为 fanout

参考下图,如果此时发送一条消息到 normalExchange 上,当路由键等于 normalKey 的时候,消息能正确路由到 normalQueue 这个队列中。 如果路由键设置为其他值,此时就会发送给 myAe,进而发送到 unroutedQueue 这个队列。

13

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为 fanout 类型,如若读者想设置为 direct 或者 topic 的类型也没有什么不妥。 需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。

考虑这样一种情况,如果备份交换器的类型是 direct,并且有一个与其绑定的队列,假设绑定的路由键是 key1,当某条携带路由键为 key2 的消息被 转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为 key1,则可以存储到队列中。

对于备份交换器,总结以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息丢失。

  • 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息丢失。

  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息丢失。

  • 如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。

过期时间(TTL)

TTL,Time to Live 的简称,即过期时间。RabbitMQ 可以对消息和队列设置 TTL。

设置消息的 TTL

目前有两种方法可以设置 TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成 "死信"(Dead Message),消费者将无法再收到该消息(不是绝对的)。

通过队列属性设置 TTL 的方法是在 channel.queueDeclare 方法中加入 x-message-ttl 参数实现的,这个参数的单位是毫秒。

示例代码:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

如果不设置 TTL,则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃, 这个特性可以部分代替 RabbitMQ 3.0 版本之前的 immediate 参数,之所以部分代替,是因为 immediate 参数在投递失败时会用 Basic.Return 将消息返回(这个功能可以用死信队列来实现)。

针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration 的属性参数,单位为毫秒。

关键代码如下:

1
2
3
4
5
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); // 持久化消息
builder.expiration("60000"; // 设置 TTL=60000ms
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());

也可以如下:

1
2
3
4
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());

对于第一种设置队列 TTL 属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去, 因为每条消息是否过期是在即将投递到消费者之前判定的。

为什么这两种方法处理的不一样?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。 而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

设置队列的 TTL

通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。 未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。

设置队列里的 TTL 可以应用于类似 RPC 方式的回复队列,在 RPC 中,许多队列会被创建出来,但是却是未被使用的。

RabbitMQ 会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在 RabbitMQ 重启后,持久化的队列的过期时间会被重新计算。

用于表示过期时间的 x-expires 参数以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,不过不能设置为 0。 比如该参数设置为 1000,则表示该队列如果在 1 秒钟之内未使用则会被删除。

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

死信队列

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。 当消息在一个队列中变成死信(Dead Message),它能被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false

  • 消息过期

  • 队列达到最大长度

DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。 当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。 可以监听这个队列中的消息进行相应的处理,这个特性与将消息的 TTL 设置为 0 配合使用可以弥补 immediate 的功能。

通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX。

1
2
3
4
5
channel.exchangeDeclare("dlx_exchange", "direct"); // 创建 DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
// 为队列 myqueue 添加 DLX
channel.queueDeclare("myqueue", false, false, false, args);

也可以为这个 DLX 指定路由键,如果没有指定,则使用原队列的路由键:

1
args.put("x-dead-letter-routing-key", "dlx-routing-key");

下面创建一个队列,为其设置 TTL 和 DLX 等:

1
2
3
4
5
6
7
8
9
10
11
12
channel.exchangeDeclare("exchange.dlx", "direct", true);  // 定义名称为 "exchange.dlx",类型为 "direct",持久化的交换器
channel.exchangeDeclare("exchange.normal", "fanout", true); // 定义名称为 "exchange.narmal",类型为 "fanout",持久化的交换器
Map<String, Obejct> args = new HashMap<String, Object>();
args.put("x-message-ttl", 10000);
args.put("x-dead-letter-exchange", "exchange.dlx"); // 绑定的死信交换器为 "exchange.dlx"
args.put("x-dead-letter-routing-key", "routingKey"); // 定义死信队列的路由键
channel.queueDeclare("queue.normal", true, false, false, args); // 定义名称为 "queue.normal",持久化的,非排他的,非自动删除的队列。绑定超时时间为 10000ms,绑定死信队列为 "exchange.dlx",绑定死信队列路由键为 "routingKey"
channel.queueBind("queue.normal", "exchange.normal", ""); // 将队列 "queue.normal" 绑定到交换器 "exchange.normal"
channel.queueDeclare("queue.dlx", true, false, false, null); // 定义名称为 "queue.dlx",持久化、非排他、非自动删除的队列
channel.queueBind("queue.dlx", "exchange.dlx", "routingKey"); // 将队列 "queue.dlx" 绑定到交换器 "exchange.dlx",路由键为 "routingKey"
channel.basicPublish("exchange.normal", "rk",
MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx",getBytes()); // 将消息 "dlx" (路由键为 rk)发送到交换器 "exchange.normal" 中

这里创建了两个交换器 exhange.normalexchange.dlx,分别绑定两个队列 queue.normalqueue.dlx

由 Web 管理界面可以看出,两个队列都被标记了 "D",这个是 durable 的缩写。即设置了队列持久化。 queue.normal 这个队列还设置了 TTL、DLX 和 DLK,其中 DLX 指的是 x-dead-letter-routing-key 这个属性。

14

参考下图,生产者首先发送一条携带路由键为 "rk" 的消息,然后经过交换器 exchange.normal 顺利地存储到队列 queue.normal 中。 由于队列 queue.normal 设置了过期时间为 10s,在这 10s 内没有消费者消费这条消息,那么判定这条消息为过期。 由于设置了 DLX,过期之时,消息被丢给交换器 exchange.dlx 中,这时找到与 exchange.dlx 匹配的队列 queue.dlx,最后消息被存储在 queue.dlx 这个死信队列中。

15

对于 RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了 Basic.Nack 或者 Basic.Reject)而被 置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。 DLX 配合 TTL 使用还可以实现延迟队列的功能。

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓 "延迟消息" 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息消费。

延迟队列的使用场景有很多,比如:

  • 在订单系统中,一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了。

  • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了就再将指令推送到智能设备。

在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,当时可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。

在下图中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于 queue.dlx 这个死信队列来说,同样可以看作延迟队列。 假设一个应用中需要将每条消息都设置为 10 秒延迟,生产者通过 exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中。 消费者订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列。 当消息从 queue.normal 这个队列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息。

在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为 5秒、10秒、30秒、1分钟、5分钟等。

参考下图,根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,以此将消息发送到与交换器绑定的不同的队列中。 这里队列分别设置了过期时间为 5秒、10秒、30秒、1分钟,同时也分别配置了 DLX 和相应的死信队列。 当相应的消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。

16

优先级队列

优先级队列,顾名思义,具有高优先级的队列具有更高的优先权,优先级高的消息具备优先被消费的特权。

可以通过设置队列的 x-max-priiority 参数来实现。

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priotity", 10);
channel.queueDeclare("queue.priotity", true, false, false, args);

上面的代码演示的是如何配置一个队列的最大优先级。在此之后,需要在发送时在消息中设置消息当前的优先级。如下:

1
2
3
4
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority", "rk_priority", properties, "messages".getBytes());

上面的代码中设置消息的优先级为 5。默认最低为 0,最高为队列设置的最大优先级。 优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度且 Broker 中没有消息堆积的情况下, 对发送的消息设置优先级也就没有什么实际意义。因为在生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息, 对于单条消息来说优先级是没有什么意义的。

RPC 实现

RPC,是 Remote Procedure Call 的简称,即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。 RPC 的主要功能是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。

通俗点来说,假设有两台服务器 A 和 B,一个应用部署在 A 服务器上,想要调用 B 服务器上应用提供的函数或者方法, 由于不在同一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

RPC 的协议有很多,比如最早的 CORBA、Java RMI 甚至还有 Restful API。

一般在 RabbitMQ 中进行 RPC 是很简单的。客户端发送给请求消息,服务端回复响应的消息。 为了接收响应的消息,我们需要在请求消息中发送一个回调队列,可以使用默认的队列,具体示例代码如下:

1
2
3
4
String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// the code to read a response message from the callback_queue...

对于代码涉及的 BasicProperties 这个类,这里用到了两个属性:

  • replyTo:通常用来设置一个回调队列

  • correlationId:用来关联请求(request)和其调用 RPC 之后的回复(response)

如果像上面的代码中一样,为每个 RPC 请求创建一个回调队列,则是非常低效的。 但是幸运的是这里有一个通用的解决方案 - 可以为每个客户端创建一个单一的回调队列。

这样就产生了一个新的问题,对于回调队列而言,在其接收到一条回复的消息之后,它并不知道这条消息应该和哪一个请求匹配, 这里就用到 correlationId 这个属性了,我们应该为每一个请求设置一个唯一的 correlationId。 之后在回调队列接收到回复的消息时,可以根据这个属性匹配到相应的请求。 如果回调队列接收到一条未知 correlationId 的回复消息,可以简单地将其丢弃。

你可能会问,为什么要将回调队列中的的位置消息丢弃而不是仅仅将其看作失败?这样可以针对这个失败做一些弥补措施。 参考下图,考虑这样一种情况,RPC 服务器可能在发送给回调队列并且在确认收到请求的消息(rpc_queue中的消息)之后挂掉了, 那么只需重启下 RPC 服务器即可,RPC 服务会重新消费 rpc_queue 队列中的请求,这样就不会出现 RPC 服务端未处理请求的情况。 这里的回调队列可能会收到重复消息的情况,这需要客户端能够优雅地处理这种情况,并且 RPC 请求也需要保证其本身是幂等的 (消费者消费消息一般是先处理业务逻辑,再使用 Basic.Ack 确认已接收到消息以防止消息不必要地丢失)。

17

根据上图,RPC 地处理流程如下:

(1)当而客户端启动时,创建要给匿名地回调队列(名称由 RabbitMQ 自动创建)

(2)客户端为 RPC 请求设置两个属性:replyTo 用来告知 RPC 服务端回复请求时地目的队列,即回调队列;correlationId 用来标记一个请求

(3)请求被发送到 rpc_queue 队列中

(4)RPC 服务端监听 rpc_queue 队列中地请求,当请求到来时,服务端会处理并且把带有结果的消息发送给客户端。 接收的队列就是 replyTo 设定的回调队列。

(5)客户端监听回调队列,当有消息时,检查 correlationId 属性,如果与请求匹配,那就是结果了。

服务端实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";

public static void main(String[] args) throws Exception {
// ... 创建 Connection 和 Channel
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope enveloper,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.builder()
.correlationId(properties.getCorrelationId())
.build();
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", properties.getReplyTo(),
replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
}

private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
}

RPC 客户端关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingCunsumer consumer;

public RPCClient() throws IOException, TimeoutException {
// ... 创建 Connection 和 Channel
replyQueueName = channal.queueDeclare().getQueue();
consumer = new QueueingCunsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws IOException,
ShuwdownSignalException, ConsumerCancelledException,
InterruptedException {
String rsponse = null;
String corrId = UUID.randomUUID().toString();

BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}

return response;
}

public void close() throws Exception {
connection.close();
}

public static void main(String[] args) throws Exception {
RPCClient fibRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibRpc.close();
}
}

持久化

"持久化" 这个词在前面的篇幅中多次提及,持久化可以提高 RabbitMQ 的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。 RabbitMQ 的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

交换器的持久化是通过在声明交换器时将 durable 参数置为 true 实现的。如果交换器不设置持久化,那么在 RabbitMQ 重启之后, 相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对于一个长期使用的交换器来说,交易将其置为持久化的。

队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的。 如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。

队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。 要确保消息不会丢失,需要将其设置为持久化。 通过将消息的投递模式(BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化。 前面示例中多次提及的 MessageProperties.PERSISTENT_TEXT_PLAIN 实际上是封装了这个属性:

1
2
3
4
5
6
7
public sttic final BasicProperties PERSISTENT_TEXT_PLAIN = 
new BasicProperties("text/plain",
null,
null,
2, // deliveryMode
null, null, null, null,
null, null);

设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。 单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消息,继而消息也丢失。 单单设置消息持久化而不设置队列的持久化显得毫无意义。

可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能,写入磁盘的速度比写入内存的速度慢得不只一点点。 对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。 在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

将交换器、队列、消息都设置了持久化之后就能百分百保证数据不丢失了吗?答案是否定的。

首先从消费者来说,如果在订阅消费队列时时将 autoAck 设置为 true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了, 这样也算数据丢失。这种情况很好解决,将 autoAck 参数设置为 false,并进行手动确认。

其次,在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间才能存入磁盘中。RabbitMQ 并不会为每条消息都进行同步存盘的处理, 可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内 RabbitMQ 服务节点发生了 宕机、重启 等异常情况,消息保存还没 来得及落盘,那么这些消息将会丢失。

这个问题怎么解决呢?这里可以引入 RabbitMQ 的镜像队列机制,相当于配置了副本,如果主节点在此特殊时间内挂掉,可以自动切换到从节点(slave), 这样有效地保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证 RabbitMQ 消息不丢失,但是配置了镜像队列要比没有配置镜像队列的 可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。

还可以在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储至 RabbitMQ 中, 前提还要保证在调用 channel.basicPublish 方法的时候交换器能够将消息正确地路由到相应地队列之中。

生产者确认

在使用 RabbitMQ 的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,初次之外,我们还会遇到一个问题, 当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何 消息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前已经丢失,持久化操作也解决不了 这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ 针对这个问题,提供了两种解决方式:

  • 通过事务机制实现

  • 通过发送方确认(publisher confirm)机制实现

事务机制

RabbitMQ 客户端中与事务机制相关地方法有三个:channel.txSelectchannel.txCommitchannel.txRollBackchannel.txSelect 用于将当前地信道设置成事务模式,channel.txCommit 用于提交事务,channel.txRollBack 用于事务回滚。 在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中, 如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollBack 方法来实现事务回滚。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。

关联示例代码:

1
2
3
4
5
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"transaction message".getBytes());
channel.txCommit();
18

可以发现开启事务机制与不开启相比多了四个步骤:

  • 客户端发送 Tx.Select,将信道置为事务模式

  • Broker 回复 Tx.SelectOk,确认已将信道置为事务模式

  • 在发送完消息之后,客户端发送 Tx.Commit 提交事务

  • Broker 回复 Tx.CommitOk,确认事务提交

上面所述的是正常情况下的事务机制运转过程,而事务回滚是什么样子呢?

1
2
3
4
5
6
7
8
9
10
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
19

如果要发送多条消息,则将 channel.basicPublish 和 channel.txCommit 等方法包裹进循环内即可:

1
2
3
4
5
6
7
8
9
10
11
channel.txSelect();

for (int i = 0; i < LOOP_TIMES; i++) {
try {
channel.basicPublish("exchange", "routingKey", null, ("message" + i).getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
}

事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后 进行事务回滚,与此同时可以进行消息重发。但是事务机制会 "吸干" RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经 正确到达,又能基本上不带来性能上的损失呢?从 AMQP 协议层面来看并没有更好的办法,但是 RabbitMQ 提供了一个改进方案,即发送发确认机制。

发送方确认机制

前面介绍了 RabbitMQ 可能会遇到的一个问题,即消息发送方(生产者)并不知道消息是否真正地到达了 RabbitMQ。 随后了解到在 AMQP 协议层面提供了事务机制来解决这个问题,但是采用事务机制会严重降低 RabbitMQ 地消息吞吐量, 这里就引入了一种轻量级地方式 - 发送方确认(publisher confirm)机制。

生产者将信道置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面的消息都会被指派一个唯一的 ID(从 1 开始), 一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者 知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在写入磁盘之后发出。 RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 中的 multiple 参数,表示这个序号之前的所有消息都已经得到了处理,可以参考下图。

20

事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。 相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息, 当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失, 就会发送一条 nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该 nack 命令。

生产者通过调用 channel.confirmSelect 方法(即 Confirm.Select 命令)将信道设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。所有被发送的后续消息都被 ack 或者 nack 一次,不会 出现一条消息既被 ack 又被 nack 的情况,并且 RabbitMQ 也并没有对消息被 confirm 的快慢做任何保证。

下面看一下 publisher confirm 机制:

1
2
3
4
5
6
7
8
9
10
11
try {
channel.confirmSelect(); // 将信道设置为 publisher confirm 模式
// 之后正常发送消息
channel.basicPublish("exchange", "routingKey", null,
"publisher confirm test".getBytes());
if (!channel.waitForConfirms()) {
System.out.println("send message failed");
}
} catch (InterruptdException e) {
e.printStackTrace();
}

如果发送多条消息,只需要将 channel.basicPublish 和 channel.waitForConfirm 方法包裹在循环里面即可,可以参考事务机制, 不过不需要把 channel.confirmSelect 方法包裹在循环内部。

在 publisher confirm 模式下发送多条消息的 AMQP 协议流转过程可以参考下图:

21

对于 channel.waitForConfirms 而言,在 RabbitMQ 客户端它有四个同类方法:

  1. boolean waitForConfirms() throws InterruptedException;

  2. boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

  3. void waitForConfirmsOrDie() throws IOException, InterruptedException;

  4. void waitForConfirmOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

如果信道没有开启 publisher confirm 模式,则调用任何 waitForConfirms 方法都会抛出异常。 对于没有参数的 waitForConfirms 方法来说,其返回的条件是客户端收到了相应的 Basic.Ack/.Nack 或者被中断。 参数 timeout 表示超时时间,一旦等待 RabbitMQ 回应超时就会抛出 TimeoutException 异常。 两个 waitForConfirmsOrDie 方法在接收到 RabbitMQ 返回的 Basic.Nack 之后会抛出 java.io.IOException。 业务代码可以根据自身的特权灵活地运用这四种方法来保障消息地可靠发送。

注意要点:

  1. 事务机制和 publisher confirm 机制两者是互斥的,不能共存。

  2. 事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的 "发送至 RabbitMQ" 的含义是指消息被正确 地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有 匹配的队列。更进一步地讲,发送方要配合 mandatory 参数或者备份交换器一起使用来提高消息传输地可靠性。

publisher confirm 的优势在于并不一定需要同步确认。这里总结一下使用方式,总结有如下两种:

  • 批量 confirm 方法:每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回

  • 异步 confirm 方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理。

在批量 confirm 方法中,客户端程序需要定期或者定量(达到多少条),亦或两者结合起来调用 channel.waitForConfirms 来等待 RabbitMQ 的 确认返回。相比前面示例中的普通 confirm 方法,批量极大地提升了 confirm 的效率,但是问题在于出现返回 Basic.Nack 或者超时情况时, 客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能应该是不升反降的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
try {
channel.confirmSelect();
int MsgCount = 0;
while (true) {
channel.basicPublish("exchange", "routingKey",
null, "batch confirm test".getBytes());
// 将发送出去的消息存入缓存中,缓存可以是
// 一个 ArrayList 或者 BlockingQueue 之类的
if (++MsgCount >= BATCH_COUNT) {
MsgCount = 0;
try {
if (channel.waitForConfirms()) {
// 将缓存中的消息清空
}
// 将缓存中的消息重新发送
} catch (InterruptedException e) {
e.printStackTrace();
// 将缓存中的消息重新发送
}
}
}
} catch (IOException e) {
e.printStackTrace();
}

异步 confirm 方法的编程实现最为复杂。在客户端 Channel 接口中提供的 addConfirmListener 方法可以添加 ConfirmListener 这个回调接口, 这个 ConfirmListener 接口包含两个方法:handleAckhandleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。 在这两个方法中都包含有一个参数 deliveryTag(在 publisher confirm 模式下用来标记消息的唯一有序序号)。 我们需要为每一个信道维护一个 "unconfirm" 的消息序号集合,每发送一条消息,集合中的元素加 1。 每当调用 ConfirmListener 中的 handleAck 方法时,"unconfirm" 集合中删除掉相应的一条(multiple 设置为 false)或者多条(multiple 设置为 true)记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple:" + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}

public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
// 注意这里需要添加处理消息重发的场景
}
});

// 下面是演示一直发送消息的场景
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}

4 种方式的 QPS 对比:

22

可以看到批量 confirm 和异步 confirm 这两种方式所呈现的性能要比其余两种好得多。 事务机制和普通 confirm 的方式吞吐量很低,但是编程方式简单,不需要在客户端维护状态(这里指的是维护 deliveryTag 及缓存未确认的消息)。 批量 confirm 方式的问题在于遇到 RabbitMQ 服务端返回 Basic.Nack 需要重发批量消息而导致的性能降低。 异步 confirm 方式编程模型最为复杂,而且和批量 confirm 方式一样需要在客户端维护状态。

消费端要点介绍

消费者客户端可以通过推模式或者拉模式的方式来获取并消费消息,当消费者处理完业务逻辑需要手动确认消息已被接收,这样 RabbitMQ 才能把当前消息从 队列中标记清除。当然如果消费者由于某些原因无法处理当前接收到的消息,可以通过 channel.basicNack 或者 channel.basicReject 来拒绝掉。

这里对于 RabbitMQ 消费端来说,还有几点需要注意:

  • 消息分发

  • 消息顺序性

  • 弃用 QueueingConsumer

消息分发

当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。 这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

很多适合轮询的分发机制也不是那么优雅。默认情况下,如果有 n 个消费者,那么 RabbitMQ 会将第 m 条消息分发给 m%n(取余的方式)个消费者, RabbitMQ 不管消费者是否消费并已经确认(Basic.Ack)了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息, 而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。

对于这种情况我们可以使用 channel.basicQos(int prefetchCount) 这个方法,channel.basicQos 方法允许限制行道上的消费者所能保持的最大未确认消息的数量。

举例说明,在订阅消费队列之前,消费端程序调用了 channel.basicQos(5),之后定于了某个队列进行消费。 RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。 直到消费者确认了某条消息之后,RabbitMQ 将相应的计数减 1,之后消费者可以继续接收消息,直到再次达到计数上限。

Basic.Qos 的使用对于拉模式的消费方式无效

channel.basicQos 有三种类型的重载方法:

  1. void basicQos(int prefetchCount) throws IOException;

  2. void basicQos(int prefetchCount, boolean global) throws IOException;

  3. void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

前面介绍的都只用到了 prefetchCount 这个参数,当 prefetchCount 设置为 0 则表示没有上限。 还有 prefetchSize 这个参数表示消费者所能接收未确认消息的总体大小的上限,单位为 B,设置为 0 则表示没有上限。

对于同一个信道来说,它可以同时消费多个队列,当设置了 prefetchCount 大于 0 时,这个信道需要和各个队列协调以确保发送的消息都没有超过所设定的 prefetchCount 的值,这样会使 RabbitMQ 的性能降低,尤其是这些队列分散在集群中的多个 Broker 节点之中。 RabbitMQ 为了提升相关的性能,在 AMQP0-9-1 协议之上重新定义了 global 这个参数。

  • global=false:信道上新的消费者需要遵从 prefetchCount 的限定值

  • global=true:信道上所有的消费者都需要遵从 prefetchCount 的限定值

1
2
3
4
5
6
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // 每个消费者能接收到的未确认消息的上限为 10
channel.basicConsume("queue1", false, consumer1);
channel.basicConsume("queue2", false, consumer2);

如果在订阅消息之前,既设置了 global 为 true 的限制,又设置了 global 为 false 的限制,RabbitMQ 会确保两者都会生效。 举例来说,当前有两个队列 queue1 和 queue2;queue1 有 10 条消息,分别为 1 到 10;queue2 也有 10 条消息,分别为 11 到 20。 有两个消费者分别消费这两个队列:

1
2
3
4
5
6
7
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(3, false); // Per consumer limit
channel.basicQos(5, true); // Per channel limit
channel.basicConsume("queue1", false, consumer1);
channel.basicConsume("queue2", false, consumer2);

那么这里每个消费者最多只能收到 3 个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为 5。 在未确认消息的情况下,如果 consumer1 接收到了消息 1、2 和 3,那么 consumer2 至多只能收到 11 和 12。 如果像这样同时使用两种 global 的模式,则会增加 RabbitMQ 的负载,因为 RabbitMQ 需要更多的资源来协调完成这些限制。 如无特殊需要,最好只使用 global 为 false 的设置,这也是默认的设置。

消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。 举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为 msg1、msg2、msg3,那么消费者必然也是按照 msg1、msg2、msg3 的顺序进行消费的。

目前很多资料显示 RabbitMQ 的消息能够保障顺序性,这是不准确的,或者说这个观点有很大的局限性。 在不使用任何 RabbitMQ 的高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下 可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。

那么有哪些情况下 RabbitMQ 的消息顺序性会被打破呢?下面介绍几种常见的情形。

如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息 在生产者这个源头就出现了错序。同样,如果启用 publisher confirm 时,在发生超时、中断,又或者是收到 RabbitMQ 的 Basic.Nack 命令时, 那么同样需要补偿发送,结果与事务机制一样会错序。或者这种说法有些牵强,我们可以固执地认为消息的顺序性是从存入队列之后开始的, 而不是在发送的时候开始的。

考虑另一种情形,如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列, 整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。

再考虑一种情形,如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。

如果一个队列按照前后顺序分有 msg1、msg2、msg3、msg4 这 4 个消息,同时有 ConsumerA 和 ConsumerB 这两个消费者同时订阅了这个队列。 队列中的消息轮询分发到各个消费者之中,ConsumerA 中的消息为 msg1 和 msg3,ConsumerB 中的消息为 msg2、msg4。 ConsumerA 收到消息 msg1 之后并不想处理而调用了 Basic.Nack/.Reject 将消息拒绝,与此同时将 requeue 设置为 true,这样这条消息就可以重新存入队列中。 消息 msg1 之后被发送到了 ConsumerB 中,此时 ConsumerB 已经消费了 msg2、msg4,之后再消费 msg1,这样消息顺序性也就错乱了。 或者消息 msg1 又重新发往 ConsumerA 中,此时 ConsumerA 已经消费了 msg3,那么再消费 msg1,消息顺序性也无法得到保障。 同样可以用在 Basic.Recover 这个 AMQP 命令中。

如果要保证消息的顺序性,需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识(类似 SequenceID)来实现。

弃用 QueueingConsumer

QueueingConsumer 在 RabbitMQ 4.x 版本开始被标记为 @Deprecated

1
2
3
4
5
6
7
8
9
10
QueueingConsumer consumer = new QueueingConsuemr(channel);
// channel.basicQos(64); // 使用 QueueingConsumer 的时候一定要添加!
channel.basicConsume(QUEUE_NAME, "consumer_zzh", consumer);

while(true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [X] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

QueueingConsumer 本身有几大缺陷,首当其冲的就是内存溢出的问题,由于某些原因,队列之中堆积了比较多的消息,就可能导致消费者客户端内存溢出假死,于是发生恶性循环,队列消息不断堆积而得不到消化。

这个内存溢出的问题可以使用 Basic.Qos 来得到有效的解决,Basic.Qos 可以限制某个消费者所保持未确认消息的数量, 也就是间接地限制了 QueueingConsumer 中的 LinkedBlockingQueue 的大小。注意一定要在 Basic.Consume 之前调用 Basic.Qos 才能生效。

QueueingConsumer 还包含以下一些缺陷:

  • QueueingConsumer 会拖累同一个 Connection 下的所有信道,使其性能降低

  • 同步递归调用 QueueingConsume 会产生死锁

  • RabbitMQ 的自动连接恢复机制(automatic connection recovery)不支持 Queueing Consumer 的这种形式

  • QueueingConsumer 不是事件驱动的

为了避免不必要的麻烦,建议在消费的时候尽量使用继承 DefaultConsumer 的方式。

消息传输保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。

  • At most once:最多一次。消息可能会丢失,但绝不会重复传输

  • At least once:最少一次。消息绝不会丢失,但可能会重复传输

  • Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次

RabbitMQ 支持其中的 "最多一次" 和 "最少一次"。其中 "最少一次" 投递实现需要考虑以下这几个方面的内容:

  1. 消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中

  2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃

  3. 消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失

  4. 消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失

"最多一次" 的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失

"恰好一次" 是 RabbitMQ 目前无法保障的。考虑这样一种情况,消费者在消费完一条消息之后向 RabbitMQ 发送确认 Basic.Ack 命令, 此时由于网络断开或者其他原因造成 RabbitMQ 并没有收到这个确认命令,那么 RabbitMQ 不会将此条消息标记删除。 在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。再考虑一种情况,生产者在使用 publisher confirm 机制的时候, 发送完一条消息等待 RabbitMQ 返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送, 这样 RabbitMQ 中就有两条同样的消息,在消费的时候,消费者就会重复消费。

那么 RabbitMQ 有没有去重的机制来保证 "恰好一次" 呢?答案是并没有,不仅是 RabbitMQ,目前大多数主流的消息中间件都没有消息去重机制, 也不保障 "恰好一次"。去重处理一般是在业务客户端实现,比如引入 GUID(Globally Unique Identifier) 的概念。针对 GUID,如果从客户端的角度去重, 那么需要引入集中式缓存,必然会增加依赖复杂度,另外缓存的大小也难以界定。建议在实际生产环境中,业务方根据自身的业务特性进行去重, 比如业务消息本身具备幂等性,或者借助 Redis 等其他产品进行去重处理。

小结

提升数据可靠性有以下一些途径:设置 mandatory 参数或者备份交换器(immediate 参数已被淘汰); 设置 publisher confirm 机制或者事务机制; 设置交换器、队列和消息都为持久化; 设置消费端对应的 autoAck 参数为 false 并在消费完消息之后再进行消息确认。

连接 RabbitMQ

下面的代码用来在给定的参数(IP 地址、端口号、用户名、密码等)下连接 RabbitMQ:

1
2
3
4
5
6
7
8
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);

Connection connection = factory.newConnection();
1
2
3
4
5
6
7
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://guest:guest@localhost:5672/virtualHost");
Connection connection = factory.newConnection();

// Connection 接口被用来创建一个 Channel
Channel channel = connection.createChannel();
// 在创建 channel 之后,channel 可以用来发送或者接收消息了

注意要点: Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享, 应用程序应该为每一个线程开辟一个 Channel。某些情况下 Channel 的操作可以并发运行,但是在其它情况下会导致在网络上 出现错误的通信帧交替,同时也会影响发送方确认(publisher confirm)机制的运行, 所以多线程间共享 Channel 实例是非线程安全的。

Channel 或者 Connection 中有个 isOpen 方法可以用来检测其是否已处于开启状态。 但并不推荐在生产环境的代码上使用 isOpen 方法,这个方法的返回值依赖于 shutdownCause 的存在,有可能会产生竞争。

isOpen 方法的源码:

1
2
3
4
5
public boolean isOpen() {
synchronized(this.monitor) {
return this.shutdownCause == null;
}
}

错误地使用 isOpen 方法:

1
2
3
4
5
6
7
8
9
public void brokenMethod(Channel channel)
{
if (channel.isOpen()) {
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
channel.basicQos(1);
}
}

通常情况下,在调用 createXXX 或者 newXXX 方法之后,我们可以简单地认为 Connection 或者 Channel 已经成功地处于开启状态, 而不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其已经处于关闭状态,那么程序会抛出一个 com.rabbitmq.client.ShutdownSignalException, 我们只需要捕获这个异常即可。当然同时也要试着捕获 IOException 或者 SocketException,以防 Connection 意外关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void validMethod(Channel channel)
{
try {
// ...
// channel.basicQos(1);
} catch (ShutdownSignalException e) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
} catch (IOException e) {
// check why connection was closed
}
}

使用交换器和队列

交换器和队列是 AMQP 中 high-level 层面的构建模块,应用程序需要确保在使用它们的时候就已经存在了,在使用之前需要先声明(declare)它们。

1
2
3
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

上面创建了一个持久化的、非自动删除的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、拍他的、自动删除的队列(此队列的名称由 RabbitMQ 自动生成)。 这里交换器和队列也都没有设置特殊的参数。

上面的代码也展示了如何将路由键将队列和交换器绑定起来。上面声明的队列具备如下特性: 只对当前应用中同一个 Connection 层面可用,同一个 Connection 的不同 Channel 可共用,并且也会在应用连接断开的时候自动删除。

如果要在应用中共享一个队列,可以做如下声明:

1
2
3
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这里的队列被声明为持久化的,非排他的、非自动删除的,并且也被分配另一个确定的已知的名称(由客户端分配而非 RabbitMQ 自动生成)。

注意:Channel 的 API 方法都是可以重载的,比如 exchangeDeclare、queueDeclare。 根据参数不用,可以有不同的重载形式,根据自身的需要进行调用。

生产者和消费者都可以声明一个交换器或者队列。 如果尝试声明一个已经存在的交换器或者队列,只要声明的参数完全匹配现存的交换器或者队列, RabbitMQ 就可以什么都不做,并成功返回。如果声明的参数不匹配则会抛出异常。

exchangeDeclare 方法详解

exchangeDeclare 方法有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

1
2
3
4
Exchange.DeclareOk exchangeDeclare(String exchange,
String type, boolean durable,
boolean autoDelete, boolean internal,
Map<String, Object> arguments) throws IOException;

这个方法的返回值是 Exchange.DeclareOk,用来标识成功声明了一个交换器。

各个参数详细说明如下所述:

  • exchange:交换器的名称

  • type:交换器的类型,常见的如 fanout、direct、topic

  • durable:设置是否持久化。durable 设置为 true 表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。

  • autoDelete:设置是否自动删除。autoDelete 设置为 true 则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后 所有与这个交换器绑定的队列或者交换器与此解绑。注意不能错误地把这个参数理解为:"当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器"

  • internal:设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程序无法直接发送消息到此交换器中,只能通过交换器路由到交换器这种方式。

  • arguments:其它一些结构化参数,比如 alternate-exchange。

exchangeDeclare 的其它重载方法如下:

  1. Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

  2. Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

  3. Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;

与此对应的,将第二个参数 String type 换成 BuiltinExchangeType type 对应的几个重载方法:

  1. Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;

  2. Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;

  3. Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, Map<String, Object> arguments) throws IOException;

与 exchangeDeclare 师出同门的还有几个方法,比如 exchangeDeclareNoWait 方法,具体定义如下(当然也有 BuiltinExchangeType 版):

1
2
3
4
5
6
void exchangeDeclareNoWait(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;

这个 exchangeDeclareNoWaitexchangeDeclare 多设置了一个 nowait 参数,这个 nowait 参数指的是 AMQP 中 Exchange.Declare 命令的参数, 意思是不需要服务器返回,注意这个方法的返回值是 void,而普通的 exchangeDeclare 方法的返回值是 Exchange.DeclareOk,意思是在客户端声明了一个交换器之后, 需要等待服务器的返回(服务器会返回 Exchange.Declare-Ok 这个 AMQP 命令)。

针对 "exchangeDeclareNoWait 不需要服务器返回任何值" 这一点,考虑这样一种情况,在声明完一个交换器之后(实际服务器还未完成交换器的创建), 那么此时客户端紧接着使用这个交换器,必然会发生异常。如果没有特殊的缘由和应用场景,并不建议使用这个方法。

这里还有师出同门的另一个方法 exchangeDeclarePassive,这个方法的定义如下:

1
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

这个方法在实际应用过程中还是非常有用的,它主要用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭。

有声明创建交换器的方法,当然也有删除交换器的方法,相应的方法如下:

  1. Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;

  2. void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;

  3. Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnUsed) throws IOException;

其中 exchange 表示交换器的名称,而 ifUnused 用来设置是否在交换器没有被使用的情况下删除。 如果 ifUnused 设置为 true,则只有在此交换器没有被使用的情况下才会被删除;如果设置为 false,则无论如何这个交换器都要被删除。

queueDeclare 方法详解

queueDeclare 相对于 exchangeDeclare 方法而言,重载的个数就少很多,它只有两个重载方法:

  1. Queue.DeclareOk queueDeclare() throws IOException;

  2. Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的(也称为匿名队列)、排他的、自动删除的、非持久化的队列。

方法的参数详细说明如下所述:

  • queue:队列的名称

  • durable:设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。

  • exclusive:设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。 这里需要注意三点:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列: "首次" 是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列,这个与普通队列不同:即使该队列是持久化的, 一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。

  • autoDelete:设置是否自动删除。为 true 则设置为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者 都断开时,才会自动删除。不能把这个参数错误地理解为:"当连接到此队列的所有客户端断开时,这个队列自动删除"。因为生产者客户端创建这个队列, 或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。

  • arguments:设置队列的其它一些参数,如 x-message-ttlx-expiresx-max-lengthx-max-priority

注意要点:生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。 必须先取消订阅,然后将信道设置为 "传输" 模式,之后才能声明队列。

对应于 exchangeDeclareNoWait 方法,这里也有一个 queueDeclareNoWait 方法:

1
2
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments) throws IOException;

方法的返回值也是 void,表示不需要服务端的任何返回。同样也需要注意,在调用完 queueDeclareNoWait 之后,紧接着使用声明的队列时有可能会发生异常情况。

同样这里还有一个 queueDeclarePassive 的方法,也比较常用。这个方法用来检测相应的队列是否存在。 如果存在则正常返回,如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭。方法定义如下:

Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

与交换器对应,关于队列也有删除的相应方法:

  1. Queue.DeleteOk queueDelete(String queue) throws IOException;

  2. Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

  3. void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

其中 queue 表示队列的名称,ifEmpty 设置为 true 表示在队列为空的情况下才能够删除。

与队列相关的还有一个有意思的方法 - queuePurge,区别于 queueDelete,这个方法用来清空队列中的内容,而不删除队列本身,具体定义如下:

Queue.PurgeOk queuePurge(String queue) throws IOException;

queueBind 方法详解

将队列和交换器绑定的方法如下,可以与前两节中的方法定义进行类比。

  1. Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

  2. Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

  3. void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

方法中涉及的参数详解。

  • queue:队列名称

  • exchange:交换器名称

  • routingKey:用来绑定队列和交换器的路由键

  • arguments:定义绑定的一些参数

不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。具体方法可以参考如下:

  1. Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

  2. Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

exchangeBind 方法详解

我们不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定,后者和前者的用法如出一辙,相应的方法如下:

  1. Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

  2. Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

  3. void exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

绑定之后,消息从 source 交换器转发到 destination 交换器,某种程度上来说 destination 交换器可以看作一个队列。

1
2
3
4
5
6
channel.exchangeDeclare("source", "direct", false, true, null);
channel.exchangeDeclare("destination", "fanout", false, true, null);
channel.exchangeBind("destination", "source", "exKey");
channel.queueDeclare("queue", false, false, true, null);
channel.queueBind("queue", "destination", "");
channel.basicPublish("source", "exKey", null, "exToExDemo".getBytes());

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination,并把消息转发到 destination 中, 进而存储在 destination 绑定的队列 queue 中。

10

何时创建

RabbitMQ 的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如果要衡量 RabbitMQ 当前的 QPS 只需看队列的即可。 在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下进行合理有效的分配。

根据 RabbitMQ 官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。 这是一个很好的建议,但不适用于所有的情况。如果业务本身在架构之初已经充分地预估了队列地使用情况, 完全可以在业务程序上线之前在服务器上创建好,这样业务程序也可以免去声明地过程,直接使用即可。

预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。 很多时候,由于人为因素、代码缺陷等,发送消息地交换器并没有绑定任何队列,那么消息将会丢失; 或者交换器绑定了某个队列,但是发送消息的时候路由键无法与现存的队列匹配,那么消息也会丢失。 当然可以配合 mandatory 参数或者备份交换器来提高程序的健壮性。

与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预期的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列 迁移到其他集群。迁移的过程也可以对业务程序完全透明。此种方法也更有利于开发和运维分工,便于相应的资源管理。

如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。

发送消息

如果要发送一个消息,可以使用 Channel 类的 basicPublish 方法,比如发送一条内容为 "Hello World!" 的消息,参考如下:

1
2
byte[] messageBodyBytes = "Hello, World!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了更好地控制发送,可以使用 mandatory 这个参数,或者可以发送一些特定属性的信息:

1
2
3
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_LPAIN,
messageBodyBytes);

上面这行代码发送了一条消息,这条消息的投递模式(delivery mode)设置为 2,即消息会被持久化(即存入磁盘)在服务器中。 同时这条消息的优先级(priority)设置为 1,content-type 为 "text/plain"。可以自己设定消息的属性:

1
2
3
4
5
6
7
8
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("hidden")
.build(),
messageBodyBytes);

也可以发送一条带有 headers 的消息:

1
2
3
4
5
6
7
8
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("location", "here");
headers.put("time", "today");
channel.basicPublish(exchangeName, routingKey,
new AMQP.BaiscProperties.Builder()
.builder(headers)
.build(),
messageBodyBytes);

还可以发送一条带有过期时间(expiration)的消息:

1
2
3
4
5
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()
.messageBodyBytes);

对于 basicPublish 而言,有几个重载方法:

  1. void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

  2. void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;

  3. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

对应的具体参数解释如下所述。

  • exchange:交换器名称,指明消息发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。

  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。

  • props:消息的基本属性集,其包含 14 个属性成员,分别有 contentType、contentEncoding、headers(Map<String, Object>)、 deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。

  • byte[] body:消息体(payload),真正需要发送的消息。

消费消息

RabbitMQ 的消费模式分两种:推(push)和拉(pull)模式。推模式采用 Basic.Consume 进行消费,而拉模式是调用 Basic.Get 进行消费。

推模式

在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:

1
2
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。当调用与 Consumer 相关的 API 方法时,不同的订阅采用不同的消费者标签(consumerTag) 来区分彼此,在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分,关键消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envolope.getRoutineKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here...)
channel.basicAck(deliveryTag, false);
}
})

注意,上面代码中显式地设置 autoAck 为 false,然后在接收到消息之后进行显式 ack 操作(channel.basicAck), 对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。

Channel 类中 basicConsume 方法有如下几种形式:

  1. String basicConsume(String queue, Consumer callback) throws IOException;

  2. String basicConsume(String queue, boolean autoAck, Consumer ballback) throws IOException;

  3. String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;

  4. String basicConsume(string queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

  5. String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

其对应地参数说明如下所述:

  • queue:队列地名称

  • autoAck:设置是否自动确认。建议设置成 false,即不自动确认

  • consumerTag:消费者标签,用来区分多个消费者

  • noLocal:设置为 true 则表示不能将同一个 Connection 中生产者传送给这个 Connection 中的消费者

  • exclusive:设置是否排他

  • arguments:设置消费者的其它参数

  • callback:设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer,使用时需要重写(override)其中的方法。

对于消费者客户端来说重写 handleDelivery 方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下:

1
2
3
4
5
void handleConsumeOk(String consumerTag);
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);

比如 handleShutdownSignal 方法,当 Channel 或者 Connection 关闭的时候会调用。 再者,handleConsumeOk 方法会在其他方法之前调用,返回消费者标签。

重写 handleCancelOkhandleCancel 方法,这样消费端可以在显式地或者隐式地取消订阅的时候调用。 也可以通过 channel.basicCancel 方法来显式地取消一个消费者的订阅:

1
channel.basicCancel(consumerTag);

注意上面这行代码会首先触发 handleConsumerOk 方法,之后触发 handleDelivery 方法,最后才触发 handleCancelOk 方法。

生产者一样,消费者客户端同样需要考虑线程安全的问题。 消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法, 比如 channel.queueDeclarechannel.basicCancel 等。

每个 Channel 都有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者,也就是意味着消费者彼此之间没有任何关联。 当然也可以在一个 Channel 中维持多个消费者,但是要注意一个问题,如果 Channel 中的一个消费者一直在运行,那么其他 消费者的 callback 会被 "耽搁"。

拉模式

通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetResponseChannel 类的 basicGet 方法没有其他重载方法,只有:

1
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

其中 queue 代表队列的名称,如果设置 autoAck 为 false,那么同样需要调用 channel.basicAck 来确认消息已被成功接收。

拉模式关键代码:

1
2
3
GetResponse response = channel.basicGet(QUEUE_NAME, false);
System.out.println(new String(response.getBody()));
channel.basicAck(response,getEnvelope().getDeliveryTag(), false);

Basic.Consume 将信道(channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ 会不断地推送消息给消费者, 当然推送消息的个数还是会受到 Basic.Qos 的限制。如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。 但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume,这样做会严重影响 RabbitMQ 的性能。 如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。

11

消费者的确认与拒绝

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。 消费者在订阅队列时,可以指定 autoAck,当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息。 当 autoAck 为 true 时,RabbitMQ 会自动地把发送出去的消息置为确认,然后从内存(或磁盘)中删除,而不管消费者是否真正地消费了这些消息。

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中消费者挂掉后消息丢失的问题, 因为 RabbitMQ 会一直等到持有消息直到消费者显式调用 Basic.Ack 命令为止。

当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成两个部分: 一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列, 等待下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开, 这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbitMQ 的 Web 管理平台上可以看到当前队列中的 "Ready" 状态和 "Unacknowledged" 状态的消息数,分别对应上文中的 等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。

在消费者收到消息后,如果想明确拒绝当前的消息而不是确认,可以使用 Basic.Reject 这个命令, 消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

Channel 类中的 basicReject 方法定义如下:

1
void basicReject(long deliveryTag, boolean requeue) throws IOException;

其中 deliveryTag 可以看作消息的编号,它是一个 64 位的长整型。 如果 requeue 为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发给下一个订阅的消费者; 如果 requeue 为 false,则 RabbitMQ 立即把消息从队列中删除,而不是把它发送给新的消费者。

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令。 消费者客户端可以调用 channel.basicNack 方法来实现,方法定义如下:

1
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

其中 deliveryTag 和 requeue 的含义可以参考 basicReject 方法。 multiple 参数设置为 false 则表示拒绝编号为 deliveryTag 的这一条消息,这时候 basicNack 和 basicReject 方法一样; multiple 设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。

channel.basicReject 或者 channel.Nack 中的 requeue 设置为 false,可以启用 "死信队列" 的功能。 死信队列可以通过检测被拒绝或者未送达的消息的消息来追踪问题。

对应 requeue,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性。其对应的客户端方法为:

  1. Basic.RecoverOk basicRecover() throws IOException;

  2. Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

这个 channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息。 如果 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。 如果 requeue 参数设置为 false,那么同一条消息会被分配给与之前相同的消费者。 默认情况下,如果不设置 requeue 这个参数,相当于 channel.basicRecover(true),即 requeue 默认为 true。

关闭连接

在应用程序使用完之后,需要关闭连接,释放资源:

1
2
channel.close();
connection.close();

显式地关闭 Channel 是个好习惯,但这不是必须但,在 Connection 关闭的时候,Channel 也会自动关闭。

AMQP 协议中的 Connection 和 Channel 采用同样的方式来管理网络失败、内部错误和显式地关闭连接。Connection 和 Channel 所具备的生命周期如下所述。

  • Open:开启状态,代表当前对象可以使用

  • Closing:正在关闭状态。当前对象被显式地通知关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成。

  • Closed:已经关闭的状态。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且其也关闭了自身。

Connection 和 Channel 中,与关闭相关的方法有 addShutdownListener(ShutdownListener listener)removeShutdownListener(ShutdownListener listener)。 当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用 ShutdownListener。而且如果将一个 ShutdownListener 注册到一个已经处于 Closed 状态的对象时,会立刻调用 ShutdownListener。

getCloseReason 方法可以让你知道对象关闭的原因;isOpen 方法检测对象当前是否处于开启状态;close(int closeCode, String closeMessage) 方法显式地通知当前对象执行关闭操作。

1
2
3
4
5
6
7
8
9
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
// ...
}
})

当触发 ShutdownListener 的时候,就可以获取到 ShutdownSignalException,这个 ShutdownSignalException 包含了关闭的原因, 这里原因也可以通过调用前面所提及的 getCloseReason获取。

ShutdownSignalException 提供了多个方法来分析关闭的原因。 isHardError 方法可以知道是 Connection 的还是 Channel 的错误;getReason 方法可以获取 cause 相关的信息:

1
2
3
4
5
6
7
8
9
10
11
public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError()) {
Connection conn = (Connection) cause.getReference();
if (!cause.isInitiatedByApplication()) {
Method reason = cause.getReason();
}
} else {
Channel ch = (Channel)cause.getReference();
}
}

RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP、MQTT 等协议)。 AMQP 的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。 当生产者发送消息时所携带的 RoutingKey 与绑定时的 BindingKey 相匹配时,消息即被存入相应的队列之中。 消费者可以订阅相应的队列来获取消息。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循 AMQP 中相应的概念。 目前 RabbitMQ 最新版本默认支持的使 AMQP 0-9-1。

AMQP 协议本身包括三层:

  • Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。 例如,客户端可以使用 Queue.Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。

  • Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端, 主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。

  • Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 说到底还是一个通信协议,通信协议都会涉及报文交互,从 low-level 举例来说, AMQP 本身是应用层的协议,其填充于 TCP 协议的数据层部分。而从 high-level 来说,AMQP 是通过协议命令进行交互的, AMQP 协议可以看作一系列结构化命令的集合,这里的命令代表一种操作,类似于 HTTP 中的方法(GET、POST 等)。

AMQP 生产者流转过程

简洁版生产者代码

1
2
3
4
5
6
7
8
9
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
String message = "Hello World";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 关闭资源
channel.close();
connection.close();

当客户端与 Broker 建立连接的时候,会调用 factory.newConnection 方法,这个方法会进一步封装成 Protocol Header 0-9-1 的报文发送给 Broker, 以此通知 Broker 本次交互采用的是 AMQP 0-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OkConnection.Tune/.Tune-OkConnection.Open/.Open-Ok 这 6 个命令的交互。

当客户端调用 connection.createChannel 方法准备开启信道的时候,其包装的 Channel.Open 命令发送给 Broker,等待 Channel.Open-Ok 命令。

当客户端发送消息的时候,需要调用 channel.basicPblish 方法,对应的 AMQP 命令为 Basic.Publish,注意这个命令和前面涉及的命令略有不同, 这个命令还包含了 Content Header 和 Content Body。Content Header 里面包含的是消息体的属性,例如,投递模式、优先级等,而 Content Body 包含消息体本身。

当客户端发送消息需要关闭资源时,涉及 Channel.Close/.Close-OkConnection.Close/.Close-Ok 的命令交互,详细如下图所示。

8

AMQP 消费者流转过程

简洁版消费者代码

1
2
3
4
5
6
7
8
9
Connection connection = factory.newConnection(addresses); // 创建连接
final Channel channel = connection.createChannel(); // 创建信道
Consumer consumer = new DefaultConsumer(channel) {} // ... 省略实现
channel.basicQos(64);
channel.basicConsume(QUEUE_NAME, consumer);
// 等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
9

消费者客户端同样需要与 Broker 建立连接,与生产者客户端一样,协议交互同样涉及 Connection.Start/.Start-OkConnection.Tune/.Tune-OkConnection.Open/.Open-Ok 等。

紧接着也少不了在 Connection 之上建立 Channel,和生产者客户端一样,协议涉及 Channel.Open/.Open-Ok

如果在消费之前调用了 channel.basicQos(int prefetchCount) 的方法来设置消费者客户端最大能 "保持" 的未确认的消息数, 那么协议流转会涉及 Basic.Qos/.Qos-Ok 这两个 AMQP 命令。

在真正消费之前,消费者客户端需要向 Broker 发送 Basic.Consume 命令(即调用 channel.basicConsume 方法)将 Channel 置为接收模式, 之后 Broker 回执 Basic.Consume-Ok 以告诉消费者客户端准备好消费消息。紧接着 Broker 向消费者客户端推送 (Push) 消息, 即 Basic.Deliver 命令,有意思的是这个和 Basic.Publish 命令一样会携带 Content Header 和 Content Body。

消费者接收到消息并正确消费之后,向 Broker 发送确认,即 Basic.Ack 命令。

在消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及 Channel.Close/.Close-OkConnection.Close/.Close-Ok