0%

在上一篇文章中,我们已经了解了 gorilla/websocket 的一些基本概念和简单的用法。 接下来,我们通过一个再复杂一点的例子来了解它的实际用法。

功能

这个例子来自源码里面的 examples/chat,它包含了以下功能:

  1. 用户访问群聊页面的时候,可以发送消息给所有其他在聊天室内的用户(也就是同样打开群聊页面的用户)
  2. 所有的用户发送的消息,群聊中的所有用户都能收到(包括自己)

其基本效果如下:

chat

为了更好地理解 gorilla/websocket 的使用方式,下文在讲解的时候会去掉一些出于健壮性考虑而写的代码。

基本架构

这个 demo 的基本组件如下图:

arch
  1. Client:也就是连接到了服务端的客户端,可以有多个
  2. Hub:所有的客户端会保存到 Hub 中,同时所有的消息也会经过 Hub 来进行广播(也就是将消息发给所有连接到 Hub 的客户端)
broadcast

工作原理

Hub

Hub 的源码如下:

1
2
3
4
5
6
7
8
9
10
type Hub struct {
// 保存所有客户端
clients map[*Client]bool
// 需要广播的消息
broadcast chan []byte
// 等待连接的客户端
register chan *Client
// 等待断开的客户端
unregister chan *Client
}

Hub 的核心方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (h *Hub) run() {
for {
select {
case client := <-h.register:
// 从等待连接的客户端 chan 取一项,设置到 clients 中
h.clients[client] = true
case client := <-h.unregister:
// 断开连接:
// 1. 从 clients 移除
// 2. 关闭发送消息的 chan
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
// 发送广播消息给每一个客户端
for client := range h.clients {
select {
// 成功写入消息到客户端的 send 通道
case client.send <- message:
default:
// 发送失败则剔除这个客户端
close(client.send)
delete(h.clients, client)
}
}
}
}
}

这个例子中使用了 chan 来做同步,这可以提高 Hub 的并发处理速度,因为不需要等待 Hubrun 方法中其他 chan 的处理。

简单来说,Hub 做了如下操作:

  1. 维护所有的客户端连接:客户端连接、断开连接等
  2. 发送广播消息

Client

Client 的源码如下:

1
2
3
4
5
6
7
8
type Client struct {
// Hub 单例
hub *Hub
// 底层的 websocket 连接
conn *websocket.Conn
// 等待发送给客户端的消息
send chan []byte
}

它包含了如下字段:

  1. Hub 单例(我们的 demo 中只有一个聊天室)
  2. conn 底层的 WebSocket 连接
  3. send 通道,这里保存了等待发送给这个客户端的数据

Client 中,是通过 readPump 这个方法来从客户端接收消息的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *Client) readPump() {
defer func() {
// 连接断开、出错等:
// 会关闭连接,从 hub 移除这个连接
c.hub.unregister <- c
c.conn.Close()
}()
// ...
for {
// 接收消息
_, message, err := c.conn.ReadMessage()
if err != nil {
// ... 错误处理
break
}
// 消息处理,最终放入 broadcast,准备发给所有其他在线的客户端
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}

readPump 方法做的事情很简单,它就是接收消息,然后通过 Hubbroadcast 来发给所有在线的客户端。

而发送消息会稍微复杂一点,我们来看看 writePump 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *Client) writePump() {
defer func() {
// 连接断开、出错:关闭 WebSocket 连接
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
// 控制写超时时间
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// 连接已经被 hub 关闭了
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

// 获取用以发送消息的 Writer
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
// 发送消息
w.Write(message)

n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
// 将接收到的信息发送出去
w.Write(<-c.send)
}

// 调用 Close 的时候,消息会被发送出去
if err := w.Close(); err != nil {
return
}
}
}
}

虽然比读操作复杂了一点,但是也还是很好理解,它做的东西也不多:

  1. 获取用以发送消息的 Writer
  2. 获取从 hub 中接收到的其他客户端的消息,发送给当前这个客户端

具体是如何工作起来的?

  1. main 函数中创建 hub 实例
  2. 通过下面这个 serveWs 来将建立 WebSocket 连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// 将 HTTP 连接转换为 WebSocket 连接
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// 客户端
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
// 注册到 hub
client.hub.register <- client

// 发送数据到客户端的协程
go client.writePump()
// 从客户端接收数据的协程
go client.readPump()
}

serveWs 中,我们在跟客户端建立起连接后,创建了两个协程,一个是从客户端接收数据的,另一个是发送消息到客户端的。

这个 demo 的作用

这个 demo 是一个比较简单的 demo,不过也包含了我们构建 WebSocket 应用的一些关键处理逻辑,比如:

  • 使用 Hub 来维持一个低层次的连接信息
  • Client 中区分读和写的协程
  • 以及一些边界情况的处理:比如连接断开、超时等

在后续的文章中,我们会基于这些已有知识去构建一个更加完善的 WebSocket 应用,今天就到此为止了。

在本系列文章中,将会使用在 Go 中一个用得比较多的 WebSocket 实现 gorilla/websocket

背景知识 - HTTP 与 WebSocket 的关系

本文会涉及到一些原理讲解,其中比较关键的一个是 HTTP 与 WebSocket 的联系与区别,了解这个可以帮助我们更好地使用 WebSocket

如果我们此前已经使用过 WebSocket,比如在 nginx 配置过 WebSocket,我们就会发现:

  1. 有个类似 upgrade 的关键字。这个关键字体现了 HTTP 与 WebSocket 的本质区别。
  2. 在 nginx 里配置,意味着 WebSocket 本质上也是通过 HTTP 协议来工作的。

我们知道,HTTP 的请求会在请求结束之后断开 TCP 连接,但 WebSocket 不一样,它在建立连接之后会一直维持着连接状态, 这样客户端与服务端就可以一直维持通信状态了。

WebSocket 建立连接的过程

在 WebSocket 协议中,初始的握手阶段使用标准的 HTTP 请求和响应:

  1. 客户端先发送一个 HTTP 请求,请求升级到 WebSocket 协议。
  2. 服务器在收到这个请求后,如果同意升级到 WebSocket,就会返回一个状态码为 101 的 HTTP 响应,指示升级成功,然后不会断开 TCP 连接。

这个过程涉及到的 HTTP 头部字段是 UpgradeConnection,具体而言,HTTP 请求头部可能包含类似以下的字段:

请求:

1
2
3
4
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade

响应:

1
2
3
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

也就是说,我们所看到的 Upgrade 实际上是把一个 HTTP 连接升级为了 WebSocket 连接,这个连接可以实现双向的通信。

这使得它非常适合实时通信的应用,例如聊天应用、在线游戏等。

gorilla/websocket 中的基本概念

WebSocket 连接 - Conn

gorilla/websocket 中使用 Conn 来表示一个 WebSocket 连接,它主要有如下作用:

  • 发送消息给客户端:Write* 方法,如 WriteJSON 发送 JSON 类型消息,又或者 WriteMessage 可以发送普通的文本消息。
  • 接收客户端发送的消息:Read* 方法,如 ReadJSONReadMessage
  • 其他功能:关闭连接、获取客户端 IP 地址等

消息

gorilla/websocket 中,消息被分为以下几种:

  • 数据消息:
    • TextMessage 文本消息:文本消息被解析为 UTF-8 编码的文本。需要应用程序来确保文本消息是有效的 UTF-8 编码文本。
    • BinaryMessage 二进制消息:二进制消息的解析留给应用程序。
  • 控制消息:可以调用 Conn 中的 WriteControlWriteMessageNextWriter 方法,将控制消息发送给对方。
    • CloseMessage 关闭连接的消息
    • PingMessage ping 消息
    • PongMessage pong 消息

注意:应用程序需要先读取连接中的消息才能处理从对等方发送的 closepingpong 消息。如果应用程序对来自对等方的消息不感兴趣, 则应用程序应启动一个 goroutine 来读取和丢弃来自对等方的消息。

并发

虽然 Golang 中有 goroutine 可以支持我们做并发操作,但是在 gorilla/websocket 中, 一个 WebSocket 连接只支持一个并发 reader 和一个并发 writer

我们的应用程序应该确保不超过一个 goroutine 同时调用写入方法(WriteMessageWriteJSON)或者读取方法(ReadMessageReadJSON)。

CloseWriteControl 方法可以与其他所有方法同时调用。

安全性

我们知道,在一般的 web 应用中,经常需要处理跨域的问题,同样的,在 gorilla/websocket 中也需要做一定的配置。

我们可以在 Upgrader 中的 CheckOrigin 字段中指定函数的 Origin 检查策略,如果 CheckOrigin 函数返回 false,则 Upgrader 方法将拒绝建立 WebSocket 连接,如果允许所有来源的连接,我们可以直接返回 true 即可。

1
2
3
4
5
6
7
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

缓冲

缓冲在 io 类操作中是一个很常见的术语,在 gorilla/websocket 中我们可以通过上面那段代码的 ReadBufferSizeWriteBufferSize 来指定连接的缓冲大小,以减少读取或写入消息时的系统调用次数。

默认大小为 4096,建议限制为最大预期消息的大小,大于最大消息最大大小的缓冲区不会带来任何好处。

Hello World

最后,让我们通过一个简单的 Hello World 程序来结束本文:

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"github.com/gorilla/websocket"
"log"
"net/http"
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func handler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatal(err)
}

conn.WriteMessage(websocket.TextMessage, []byte("Hello, World!"))
conn.Close()
}

func main() {
http.HandleFunc("/ws", handler)
http.ListenAndServe(":8181", nil)
}

执行 go run main.go 启动 WebSocket 服务端,然后,我们打开一个浏览器的控制台, 在里面执行下面的 JavaScript 代码:

1
let ws = new WebSocket('ws://127.0.0.1:8181/ws')

不出意外的话,我们可以在浏览器控制台的 Network -> WS 中看到由服务端发送的 Hello, World!

什么是 xhprof?

xhprof 是一个轻量级 PHP 性能分析工具。

它报告函数级别的请求次数和各种指标,包括阻塞时间,CPU 时间和内存使用情况。

注意:xhprof 的使用开销很大,所以只能在本地开发调试的时候使用。

安装

我们可以通过 pecl 来安装 xhprof

1
2
# 目前最新版本是 2.3.9
pecl install xhprof-2.3.9

安装完之后,运行一下 php -m 查看是否已经启用:

1
php -m

最后,修改一下 php.ini 配置文件,添加以下配置:

1
xhprof.output_dir = /tmp/xhprof

注意:这里指定的文件夹必须有写的权限才行。

当然,我们也可以通过源码编译安装,源码在 https://github.com/longxinH/xhprof

使用

主要有两个步骤:

  1. 使用 xhprof_enable 来开启 xhprof 性能监控
1
2
3
xhprof_enable(XHPROF_FLAGS_NO_BUILTINS +
XHPROF_FLAGS_CPU +
XHPROF_FLAGS_MEMORY);
  1. 注册一个 shutdown 处理器

它的作用是在 php 请求处理完毕的时候将性能指标数据写入到文件中,如果没有这个,则在 xhprof.output_dir 中将不会有任何输出。

1
2
3
4
5
6
7
8
9
register_shutdown_function(function(){
$data = xhprof_disable(); //返回运行数据
// 需要在 https://github.com/longxinH/xhprof 下载源码,下面的 `.../xhprof_lib` 就是下载源码中的 `xhprof_lib` 目录
// 这里需要替换为自己本地的路径
include '/Users/ruby/Code/xhprof-2.3.9/xhprof_lib/utils/xhprof_lib.php';
include '/Users/ruby/Code/xhprof-2.3.9/xhprof_lib/utils/xhprof_runs.php';
$objXhprofRun = new XHProfRuns_Default();
$objXhprofRun->save_run($data, "test"); //test 表示文件后缀
});

通过 HTML 页面展示性能分析结果

  1. 我们在本地的 nginx 中添加一个 xhprof web 服务的配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
server {
listen 8088;
server_name xhprof.local;
root /Users/ruby/Code/xhprof-2.3.9;

add_header X-Frame-Options "SAMEORIGIN";
add_header X-Content-Type-Options "nosniff";

index index.php;

charset utf-8;

location / {
try_files $uri $uri/ /index.php?$query_string;
}

location = /favicon.ico { access_log off; log_not_found off; }
location = /robots.txt { access_log off; log_not_found off; }

error_page 404 /index.php;

location ~ \.php$ {
fastcgi_pass 127.0.0.1:9000;
fastcgi_param SCRIPT_FILENAME $realpath_root$fastcgi_script_name;
include fastcgi_params;
}

location ~ /\.(?!well-known).* {
deny all;
}
}
  1. 重启本地的 php-fpm

  2. 在我们的 /etc/hosts 中加上 nginx 中配置的域名即可

  3. 打开浏览器,访问 http://xhprof.local:8088/xhprof_html/

xhprof
  1. 点击其中一个进去,可以看到详情
xhprof

在这里,我们还能点击每一列的表头,让它按这一列来排序。上图我就按 Calls 逆序排序了。

以图的形式来展示

上面我们通过表格的方式来看到了函数调用的次数、时间等,但表格其实不够直观。

我们也看到上面图中的正中间有一个 View Full Callgraph 的超链接,我们可以通过这个超链接来查看具体的函数调用链, 这样我们可以更加直观的知道调用入口在哪里,以及整个调用链条大概长什么样子的。

xhprof

图太大了,这里随便看看吧

注意:要使用这个功能,我们需要安一个插件 graphviz。mac 下可以通过 brew install graphviz,其他的自行搜索。

最近在项目中发现有个接口需要耗时几分钟才能完成,排查发现跟以往的慢请求不大一样,这次的慢请求中,并没有慢查询(不管是 MySQL、MongoDB 还是 Redis、HTTP),经过排查发现其中有一个函数处理时间非常长,整个请求的 99% 的时间都花在了这个函数上:

1
2
3
4
5
6
// 函数内主要是一个 for 循环
foreach ($rules as $rule) {
if (empty($this->hasAllCustomerUserIds[$rule->user_id])) {
$this->matchRule($rule, $results);
}
}

上面这个循环有 600+ 次,但是循环内的 matchRule11w+ 个循环,也就是说,总循环次数达到了 6000w+ 次。 这样一来,就算我们没有慢查询,某些性能不高的点累积起来也会导致整个请求耗时非常漫长,以至最终耗时达到了 7 分钟。

根本原因

在这 6000w+ 的循环里面,完全没有太复杂的操作,更没有什么查询之类的操作,但是由于内层循环都是操作了 ORM 模型,有比较多的获取 ORM 模型属性的操作。 而 Laravel 的 ORM 模型中获取属性是通过魔术方法 __get() 实现的,而这个 __get() 中有一个时间复杂度比较高的操作 getAttribute(),所以这里会有一定的性能问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Dynamically retrieve attributes on the model.
*
* @param string $key
* @return mixed
*/
public function __get($key)
{
return $this->getAttribute($key);
}

/**
* Get an attribute from the model.
*
* @param string $key
* @return mixed
*/
public function getAttribute($key)
{
if (! $key) {
return;
}

// If the attribute exists in the attribute array or has a "get" mutator we will
// get the attribute's value. Otherwise, we will proceed as if the developers
// are asking for a relationship's value. This covers both types of values.
if (array_key_exists($key, $this->attributes) ||
$this->hasGetMutator($key)) {
return $this->getAttributeValue($key);
}

// Here we will determine if the model base class itself contains this given key
// since we don't want to treat any of those methods as relationships because
// they are all intended as helper methods and none of these are relations.
if (method_exists(self::class, $key)) {
return;
}

return $this->getRelationValue($key);
}

这里只贴 __get()getAttribute() 方法,但是我们也能看到了,虽然我们只是做了一个简单的获取模型属性的操作,但底层涉及了很多的方法调用。

在这种情况下,虽然在代码中已经做了一些查询上的优化,但是这个计算规模下,对 Laravel ORM 模型的操作带来的性能问题会非常显著。

因为模型中使用 $model->attribute 这种方式来获取它的属性的时候,时间复杂度会很高(相比于普通的对象属性),下面是一个性能测试:

100w 次模型访问属性操作

1
2
3
4
5
6
7
8
9
10
11
12
13
class Foo extends \Illuminate\Database\Eloquent\Model {
}

$start = microtime(true);

$model = new Foo;
$model->a = 1;
for ($i = 0; $i < 1000000; $i++) {
$model->a;
}

// 0.7896(=789.6ms)
dump(bcsub(microtime(true), $start, 4));

最终耗时 789.6ms

100w 次普通对象获取属性操作

1
2
3
4
5
6
7
8
9
10
11
12
13
class Bar {
}
$bar = new Bar;
$bar->a = 1;

$start = microtime(true);

for ($i = 0; $i < 1000000; $i++) {
$bar->a;
}

// 0.0140(=14ms)
dump(bcsub(microtime(true), $start, 4));

最终耗时 14ms

也就是说,两者在 100w 的计算规模下,性能差距相差了 56.4 倍。

56.4 倍在数据规模小的时候,我们通常无法感知,比如 10ms,再乘以 56.4 也就是 564,这也还是一个可以接受的时间。

但是在数据规模较大的时候,原本只需要 1s 的操作,可能就得需要 56s 了,这就是非常明显的。

ORM 模型的属性访问做了什么?

首选我们需要知道的是,我们定义的模型中,并没有定义显式地定义任何 public 属性,对于这种情况,我们访问它的属性的时候,php 会去调用对象的 __get 方法。那就顺着模型的 __get 方法看看它做了什么:

orm_optimize

我们可以看到,我们只是做了一个简单的属性访问,但是底层却调用了 8+ 方法。

方法调用在次数少的时候我们无法感知,但是我们可以看看 100w 次方法调用需要多久:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Test {
public function t()
{}
}

$start = microtime(true);

$test = new Test();
for ($i = 0; $i < 1000000; $i++) {
$test->t();
}

// 0.0216(=21.6ms)
dump(bcsub(microtime(true), $start, 4));

上面的代码中,我们的 t 方法里面什么都没有做,但是调用了 100w 次之依然需要 21ms

也就是说,在模型访问属性所产生的 8+ 方法调用中,至少需要花费 160ms,这还是一个保守的数字,因为上面产生的方法调用里面还有一些方法调用没有画出来。

如何优化?

知道了原因之后,我们优化起来就简单了,那就是尽量把 ORM 模型访问属性所产生的额外开销去掉,因为在这里讨论的问题中,并不需要模型帮我们做额外的处理(比如 cast、日期转换)。

因此,最简单的实现方法就是将 ORM 模型转换为普通的实体类型。

这一种优化方法下,也还有两种实现方式,先说第一种,直接创建一个 stdClass 对象,然后将 ORM 模型的属性依次赋值到这个 stdClass 对象中,如下:

在我们定义的模型中添加一个方法 toStdClass:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Foo extends \Illuminate\Database\Eloquent\Model {
// 转换为 stdClass
public function toStdClass(): stdClass
{
$result = new stdClass();

foreach ($this->attributes as $key => $attribute) {
$result->{$key} = $attribute;
}

return $result;
}
}

$start = microtime(true);

$model = new Foo;
$model->a = 2;

$stdClass = $model->toStdClass();

for ($i = 0; $i < 1000000; $i++) {
$stdClass->a;
}

// 0.0140(=14ms)
dump(bcsub(microtime(true), $start, 4));

在这种实现方式中,由于我们在循环里面操作的是一个 stdClass,所以也就没有了 ORM 模型访问属性时候的方法调用开销,最终时间是 14ms,也就是跟我们访问普通对象属性的开销一样了,比原来快了 56 倍。

但是,需要注意的是,在这种实现中,由于我们拿到的是一个 stdClass,也就丧失了 ORM 模型本身带来的一些便利。不过好在这种便利在我们只是做简单的属性访问的时候是不需要的,除非我们需要在拿到模型之后还需要做 CRUD 操作。就算如此,我们也依然可以再改造一下我们的实现方式,另外定义一个实体类,使用这个实体类来代替上面的 stdClass,然后再在这个实体类中添加一个方法,让其拥有转换回 ORM 模型的能力(第 2 种实现方式):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Foo extends \Illuminate\Database\Eloquent\Model
{
public function toStdClass(): stdClass
{
$result = new stdClass();

foreach ($this->attributes as $key => $attribute) {
$result->{$key} = $attribute;
}

return $result;
}
}

class FooEntity
{
// 由 ORM 模型创建一个 Entity
public static function make(\Illuminate\Database\Eloquent\Model $model): self
{
$result = new self();

foreach ($model->getAttributes() as $key => $attribute) {
$result->{$key} = $attribute;
}

return $result;
}

// 由 Entity 转换回 ORM 模型
public function toModel(): Foo
{
$result = new Foo();

foreach (get_object_vars($this) as $key => $attribute) {
$result->setAttribute($key, $attribute);
}

return $result;
}
}

$model = new Foo;
$model->a = 2;

// model -> entity
$entity = FooEntity::make($model);
// entity -> model
$model = $entity->toModel();

优化效果

这里不贴具体代码了。

在不做其他优化的情况下,只是将循环中的 ORM 模型修改为实体之后,原本 7 分钟的函数,最终只需要 44s 了。

当然,代码还有不少其他可以优化的地方,只是那些是比较常规的可以优化的,这里不再赘述。截止发文这天,这个优化已经上线了,将其他的可以优化的地方优化之后,原本 7 分钟的请求,最终耗时 20s 左右。

存在问题

  1. 上面的两种实现方式都没有显式指定类的属性,不好维护。(可以显式定义实体类,并显式定义 ORM 中存在的属性)
  2. 转换为 entity 之后无法拥有 ORM 模型的能力,需要注意。

由于第 2 点,所以这种优化不适用于那些需要更新的场景,当然我们也可以将实体转换回 ORM 模型,然后再做 CURD 操作。

总结

Laravel 的 ORM 模型给我们带来了非常大的便利,但是它通过魔术方法 __get() 来获取模型属性的方式在大数据量操作下会有一些性能问题, 如果我们不需要在这过程做 CURD 操作,我们可以将 ORM 转换为简单的实体对象。

这样就可以大大减少在 ORM 模型中访问属性的开销。

观察者模式

在说事件机制之前,我们先聊一下观察者模式,因为 Spring 的事件机制本质上是观察者模式的一种实现。

我们都知道,有一种设计模式叫观察者模式,它用于建立对象之间一对多的依赖关系,当一个对象的状态发生变化时, 所有依赖它的对象都会得到通知并自动更新。这种模式常用于需要实现对象之间松耦合的场景, 其中一个对象(被观察者)的状态变化会影响到其他多个对象(观察者)。

观察者模式的主要角色

  1. 被观察者(Subject):也称为主题或者可观察对象,它维护了一个观察者列表,可以添加、删除和通知观察者。当其状态发生变化时,会通知所有注册的观察者。
  2. 观察者(Observer):观察者是依赖于被观察者的对象,它定义了一个方法,用于在被观察者状态发生变化时进行更新操作。

观察者模式的工作流程

  1. 被观察者对象注册观察者:观察者通过某种方式向被观察者注册,通常是将自己添加到被观察者的观察者列表中。(建立起观察者与被观察者的关联)
  2. 被观察者状态变化
  3. 通知观察者:被观察者遍历观察者列表,调用每个观察者的更新方法(onEventhandle...)
  4. 观察者更新:每个观察者根据被观察者的通知进行相应的更新操作,执行与状态变化相关的任务

观察者模式的优点

解耦性:被观察者和观察者之间的关系是松耦合的,提高了代码的可维护性和扩展性。 可以轻松添加或删除观察者,可以在不修改被观察者的情况下增加新的观察者。

Spring 中的事件

Spring 的事件机制是 Spring 框架中的一个重要特性,基于观察者模式实现,它可以实现应用程序中的解耦,提高代码的可维护性和可扩展性。

Spring 的事件机制包括事件、事件发布、事件监听器等几个基本概念:

  1. 事件:事件是一个抽象的概念,它代表着应用程序中的某个动作或状态的发生。
  2. 事件发布:是事件发生的地方,它负责发布事件,从而通知事件监听器。
  3. 事件监听器:事件的接收者,它负责处理事件并执行相应的操作。

在 Spring 的事件机制中,事件源和事件监听器之间通过事件进行通信,从而实现了代码的解耦。

1

如上图所示,在观察者模式的实现中,往往还会有一个 Dispatcher 的角色, 由它来通知观察者,在 Spring 中,ApplicationContext 就扮演了这个角色。

如何定义事件

在 Spring 中,我们可以通过继承 ApplicationEvent 来自定义一个事件:

1
2
3
4
5
6
7
import org.springframework.context.ApplicationEvent;

public class MyEvent extends ApplicationEvent {
public MyEvent(Object source) {
super(source);
}
}

我们会发现,ApplicationEvent 有一个必选的参数 source,这个参数在实践中往往传递 this,也就是事件发生处的对象,这个参数不能为 null

如何监听事件?

在 Spring 中,监听事件的方式有两种:

  1. 实现 ApplicationListener,需要指定它要监听的事件类型
1
2
3
4
5
6
7
8
9
10
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
public class MyEventListener implements ApplicationListener<MyEvent> {
@Override
public void onApplicationEvent(MyEvent event) {
System.out.println("MyEventListener::onApplicationEvent");
}
}

注意:我们需要添加 @Component 注解以便 Spring 可以注册这个观察者。

  1. 使用 @EventListener 注解
1
2
3
4
5
6
7
8
9
10
11
import com.example.springeventdemo.event.MyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class SpringEventListener {
@EventListener
public void myEvent(MyEvent myEvent) {
System.out.println("my event.");
}
}

我们可以使用 @EventListener 注解在托管 bean 的任何方法上注册事件监听器。 需要监听的事件通过方法的参数来指定。

如何发布事件

我们可以使用 ApplicationEventPublisher 来发布一个事件,也就是通知所有的观察者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.example.springeventdemo.event.MyEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

@Component
public class MyService {
@Autowired
ApplicationEventPublisher applicationEventPublisher;

public void publish() {
applicationEventPublisher.publishEvent(new MyEvent(this));
}
}

也可以使用 ConfigurableApplicationContext,不过这个接口其实也是继承了 ApplicationEventPublisher 接口。

事件异步处理

有时候,我们的一些事件是可以异步处理的,比如注册成功之后给用户发送验证邮件, 注册成功我们就可以返回了,而发送验证邮件的这一步操作可以异步进行处理, 从而加快接口的响应速度。

在 Spring 中,我们可以使用 @Async 注解来将一个 EventListener 标记为异步处理的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class AsyncMyEventListener {
@EventListener
@Async
public void listen(MyEvent myEvent) {
try {
// 模拟耗时操作
Thread.sleep(1000);
// 请求结束之后才会输出下面这一行
System.out.println("AsyncMyEventListener::listen");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

但是,要使用 @Async 我们必须在我们的主程序类中加上 @EnableAsync 注解:

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync // 加上这个注解
public class SpringEventDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringEventDemoApplication.class, args);
}
}

使用 @Async 注解的监听器,会放到跟请求不同的线程中处理。

指定 EventListener 的处理条件

我们可以通过 EventListenercondition 属性来决定监听器是否需要执行:

使用 SpEL 表达式,当然我们也可以把判断写到方法体内。

1
2
3
4
5
// 在 myEvent 的 foo 属性等于 'bar' 的时候才会触发
@EventListener(condition = "#myEvent.foo == 'bar'")
public void myEvent1(MyEvent myEvent) {
System.out.println("my event: foo=bar");
}

使用 condition 而不是写到方法体中的原因是:

  1. 解耦和可配置性:通过将条件与事件监听器声明分离,你可以在不修改监听器代码的情况下更改条件。这使得在不同的环境或不同的配置下轻松切换监听器的行为成为可能。
  2. 动态切换行为:允许你根据应用程序或配置来动态决定是否触发事件监听器。这对于需要根据运行时条件来启动或禁用监听器的情况非常有用。
  3. 可测试性:可以为不同的条件编写单元测试,以确保条件的正确性。
  4. 统一管理:当有多个监听器时,将条件集中管理在 condition 属性中可以提高代码的可读性,因为你可以轻松查看每个监听器的条件而无需查看每个监听器的具体实现。

监听多个事件

我们可以通过 EventListenerclasses 属性来指定要监听的多个事件:

1
2
3
4
@EventListener(classes = {MyEvent.class, AnotherEvent.class})
public void myEvent2(Object event) {
System.out.println("myEvent2: " + event.getClass());
}

这个时候,我们的参数类型就需要修改一下了。

指定事件监听器的执行顺序

我们可以通过 @Order 注解来指定一个事件的不同监听器的执行顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;

@Component
public class SpringEventListener {
@Order(2)
@EventListener
public void myEvent2(MyEvent myEvent) {
// 后执行
System.out.println("my event. order = 2");
}

@Order(1)
@EventListener
public void myEvent1(MyEvent myEvent) {
// 先执行
System.out.println("my event. order = 1");
}
}

控制事件在事务提交前执行

有时候,我们会在代码中通过 @Transactional 来使用事务:

1
2
3
4
@Transactional(rollbackFor = RuntimeException.class)
public void saveFoo(Foo foo) {
fooRepository.save(foo);
}

假设我们在这个方法中有很多代码,然后其中穿插地发布了一些事件,但是我们希望这些事件在整个事务后才去触发监听器的处理逻辑, 这个时候我们就需要使用 @TransactionalEventListener 来注解我们的事件监听器,而不是使用 @EventListener

发布事件:

1
2
3
4
5
6
7
8
9
10
11
@Transactional(rollbackFor = RuntimeException.class)
public void saveFoo(Foo foo) {
// 发布了事件,但是事件处理器并不会马上处理,要等事务开始提交、结束提交的时候才会执行
// 所以我们会看到 "before save" 和 "after save" 输出在 "before commit" 之前
FooEvent event = new FooEvent(this);
eventPublisher.publishEvent(event);

System.out.println("before save");
fooRepository.save(foo);
System.out.println("after save");
}

事件监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

@Component
public class TransactionEventListener {
// 在事务提交前处理这个 FooEvent
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeCommit(FooEvent event) {
System.out.println("before commit: foo event.");
}

// 在事务提交后处理这个 FooEvent
// 如果事务回滚则不会处理这个 FooEvent。
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(FooEvent event) {
System.out.println("after commit: foo event.");
}
}

上面的代码会输出:

1
2
3
4
before save
after save
before commit: foo event.
after commit: foo event.

@TransactionalEventListener 为我们提供了一个 phase 参数,让我们可以控制事件监听器的执行时机,它有以下可选值:

  • TransactionPhase.BEFORE_COMMIT:事务提交前
  • TransactionPhase.AFTER_COMMIT:事务提交后
  • TransactionPhase.AFTER_ROLLBACK:事务回滚后
  • TransactionPhase.AFTER_COMPLETION:事务完成后

tips:@TransactionalEventListener 并不是给我们监听事务的,只是控制事件在事务提交过程中的某一时刻触发。