0%

有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket 的使用有一定的了解了, 今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket

需求背景

在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。

对于这种场景,我们可以使用 WebSocket 来实现。其他可以使用 WebSocket 进行通知的场景还有像管理后台一些通知(比如新订单通知)等。

在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:

  1. 可以给特定的用户推送:建立连接的时候,就建立起 WebSocket 连接与用户 ID 之间的关联
  2. 断开连接的时候,移除 WebSocket 连接与用户的关联,并且关闭这个 WebSocket 连接
  3. 业务系统可以通过 HTTP 接口来给特定的用户推送 WebSocket 消息:只要传递用户 ID 以及需要推送的消息即可

基础框架

下面是一个最简单版本的框架图:

arch

它包含如下几个角色:

  1. Client 客户端,也就是实际中接收消息通知的浏览器
  2. Server 服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持 WebSocket 连接,推送消息到特定的 WebSocket 连接
  3. 业务逻辑:这个实际上不属于 demo 的一部分,但是 Server 推送的数据是来自业务逻辑处理的结果

设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket 推送中心的技术上的变动也不会影响到实际的业务。

开始开发

一些结构体变动

  1. Client 结构体的变化
1
2
3
4
5
6
7
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
// 新增字段
uid int
}

因为我们需要建立起 WebSocket 连接与用户之间的关联,因此我们需要一个额外的字段来记录用户 ID,也就是上面的 uid 字段。

这个字段会在客户端建立连接后写入。

  1. Hub 结构体的变化
1
2
3
4
5
6
7
8
9
10
11
type Hub struct {
clients map[*Client]bool
register chan *Client
unregister chan *Client

// 记录 uid 跟 client 的对应关系
userClients map[int]*Client

// 读写锁,保护 userClients 以及 clients 的读写
sync.RWMutex
}
  1. 因为我们不再需要做广播,所以会移除 Hub 中的 broadcast 字段。

取而代之的是,我们会直接在消息推送接口中写入到 uid 对应的 Clientsend 通道。 当然我们也可以在 Hub 中另外加一个字段来记录要推送给不同 uid 的消息,但是我们的 Hubrun 方法是一个协程处理的,当需要推送的数据较多或者其中有 网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下 run 方法,启动多个协程来处理,不过这样比较复杂,本文会在 writePump 中处理。 (也就是建立 WebSocket 连接时的那个写操作协程)

  1. 同时为了更加快速地通过 uid 来获取对应的 WebSocket 连接,新增了一个 userClients 字段。

这是一个 map 类型的字段,keyuid,值是对应的 Client 指针。

  1. 最后新增了一个 Mutex 互斥锁

因为,在用户实际进行登录的时候需要写入 userClients 字段,而这是一个 map 类型字段,并不支持并发读写。 如果我们在接受并发连接的时候同时修改 userClients 的时候会导致 panic,因此我们使用了一个互斥锁来保证 userClients 的读写安全。

同时,clients 也是一个 map,但上一篇文章中没有使用 sync.Mutex 来保护它的读写,在并发操作的时候也是会有问题的, 所以 Mutex 同时也需要保护 clients 的读写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.Lock()
h.clients[client] = true
h.Unlock()
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
h.Lock()
delete(h.userClients, client.uid)
delete(h.clients, client)
h.Unlock()
close(client.send)
}
}
}
}

最后,我们会在 Hubrun 方法中写 userClients 或者 clients 字段的时候,先获取锁,写成功的时候释放锁。

建立连接

在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 将 HTTP 转换为 WebSocket 连接的 Upgrader
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

// 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// 升级为 WebSocket 连接
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// 新建一个 Client
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
// 注册到 Hub
client.hub.register <- client

// 推送消息的协程
go client.writePump()
// 结束消息的协程
go client.readPump()
}

接收消息

由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
_ = c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Time{}) // 永不超时
for {
// 从客户端接收消息
_, message, err := c.conn.ReadMessage()
if err != nil {
log.Println("readPump error: ", err)
break
}

// 只处理登录消息
var data = make(map[string]string)
err = json.Unmarshal(message, &data)
if err != nil {
break
}

// 写入 uid 以及 Hub 的 userClients
if uid, ok := data["uid"]; ok {
c.uid = uid
c.hub.Lock()
c.hub.userClients[uid] = c
c.hub.Unlock()
}
}
}

在本文中,假设客户端的登录消息格式为 {"uid": "123456"} 这种 json 格式。

在这里也操作了 userClients 字段,同样需要使用互斥锁来保证操作的安全性。

发送消息

  1. 在我们的系统中,可以提供一个 HTTP 接口来跟业务系统进行交互:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 发送消息的接口
// 参数:
// 1. uid:接收消息的用户 ID
// 2. message:需要发送给这个用户的消息
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
send(hub, w, r)
})

// 发送消息的方法
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
uid := r.FormValue("uid")
// 参数错误
if uid == "" {
w.WriteHeader(http.StatusBadRequest)
return
}

// 从 hub 中获取 client
hub.Lock()
client, ok := hub.userClients[uid]
hub.Unlock()
// 尚未建立连接
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}

// 发送消息
message := r.FormValue("message")
client.send <- []byte(message)
}
  1. 实际发送消息的操作

writePump 方法中,我们会将从 /send 接收到的数据发送给对应的用户:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 发送消息的协程
func (c *Client) writePump() {
defer func() {
_ = c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
// 设置写超时时间
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
// 连接已经被关闭了
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

// 获取一个发送消息的 Writer
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
// 写入消息到 Writer
w.Write(message)

// 关闭 Writer
if err := w.Close(); err != nil {
return
}
}
}
}

在这个方法中,我们会从 c.send 这个 chan 中获取需要发送给客户端的消息,然后进行发送操作。

测试

  1. 启动 main 程序
1
go run main.go
  1. 打开一个浏览器的控制台,执行以下代码
1
2
ws = new WebSocket('ws://127.0.0.1:8181/ws')
ws.send('{"uid": "123"}')

这两行代码的作用是与 WebSocket 服务器建立连接,然后发送一个登录信息。

然后我们打开控制台的 Network -> WS -> Message 就可以看到浏览器发给服务端的消息:

login
  1. 使用 HTTP 客户端发送消息给 uid 为 123 的用户

假设我们的 WebSocket 服务器绑定的端口为 8181

打开终端,执行以下命令:

1
curl "http://localhost:8181/send?uid=123&message=Hello%20World"

然后我们可以在 Network -> WS -> Message 看到接收到了消息 Hello World

hello world

结束了

到此为止,我们已经实现了一个初步可工作的 WebSocket 应用,当然还有很多可以优化的地方, 比如:

  1. 错误处理
  2. Hub 状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下 Hub 的当前状态,比如当前连接数
  3. 日志:出错的时候,日志可以帮助我们快速定位问题

这些功能会在后续继续完善,今天就到此为止了。

在上一篇文章中,我们已经了解了 gorilla/websocket 的一些基本概念和简单的用法。 接下来,我们通过一个再复杂一点的例子来了解它的实际用法。

功能

这个例子来自源码里面的 examples/chat,它包含了以下功能:

  1. 用户访问群聊页面的时候,可以发送消息给所有其他在聊天室内的用户(也就是同样打开群聊页面的用户)
  2. 所有的用户发送的消息,群聊中的所有用户都能收到(包括自己)

其基本效果如下:

chat

为了更好地理解 gorilla/websocket 的使用方式,下文在讲解的时候会去掉一些出于健壮性考虑而写的代码。

基本架构

这个 demo 的基本组件如下图:

arch
  1. Client:也就是连接到了服务端的客户端,可以有多个
  2. Hub:所有的客户端会保存到 Hub 中,同时所有的消息也会经过 Hub 来进行广播(也就是将消息发给所有连接到 Hub 的客户端)
broadcast

工作原理

Hub

Hub 的源码如下:

1
2
3
4
5
6
7
8
9
10
type Hub struct {
// 保存所有客户端
clients map[*Client]bool
// 需要广播的消息
broadcast chan []byte
// 等待连接的客户端
register chan *Client
// 等待断开的客户端
unregister chan *Client
}

Hub 的核心方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (h *Hub) run() {
for {
select {
case client := <-h.register:
// 从等待连接的客户端 chan 取一项,设置到 clients 中
h.clients[client] = true
case client := <-h.unregister:
// 断开连接:
// 1. 从 clients 移除
// 2. 关闭发送消息的 chan
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
// 发送广播消息给每一个客户端
for client := range h.clients {
select {
// 成功写入消息到客户端的 send 通道
case client.send <- message:
default:
// 发送失败则剔除这个客户端
close(client.send)
delete(h.clients, client)
}
}
}
}
}

这个例子中使用了 chan 来做同步,这可以提高 Hub 的并发处理速度,因为不需要等待 Hubrun 方法中其他 chan 的处理。

简单来说,Hub 做了如下操作:

  1. 维护所有的客户端连接:客户端连接、断开连接等
  2. 发送广播消息

Client

Client 的源码如下:

1
2
3
4
5
6
7
8
type Client struct {
// Hub 单例
hub *Hub
// 底层的 websocket 连接
conn *websocket.Conn
// 等待发送给客户端的消息
send chan []byte
}

它包含了如下字段:

  1. Hub 单例(我们的 demo 中只有一个聊天室)
  2. conn 底层的 WebSocket 连接
  3. send 通道,这里保存了等待发送给这个客户端的数据

Client 中,是通过 readPump 这个方法来从客户端接收消息的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *Client) readPump() {
defer func() {
// 连接断开、出错等:
// 会关闭连接,从 hub 移除这个连接
c.hub.unregister <- c
c.conn.Close()
}()
// ...
for {
// 接收消息
_, message, err := c.conn.ReadMessage()
if err != nil {
// ... 错误处理
break
}
// 消息处理,最终放入 broadcast,准备发给所有其他在线的客户端
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}

readPump 方法做的事情很简单,它就是接收消息,然后通过 Hubbroadcast 来发给所有在线的客户端。

而发送消息会稍微复杂一点,我们来看看 writePump 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *Client) writePump() {
defer func() {
// 连接断开、出错:关闭 WebSocket 连接
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
// 控制写超时时间
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// 连接已经被 hub 关闭了
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

// 获取用以发送消息的 Writer
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
// 发送消息
w.Write(message)

n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
// 将接收到的信息发送出去
w.Write(<-c.send)
}

// 调用 Close 的时候,消息会被发送出去
if err := w.Close(); err != nil {
return
}
}
}
}

虽然比读操作复杂了一点,但是也还是很好理解,它做的东西也不多:

  1. 获取用以发送消息的 Writer
  2. 获取从 hub 中接收到的其他客户端的消息,发送给当前这个客户端

具体是如何工作起来的?

  1. main 函数中创建 hub 实例
  2. 通过下面这个 serveWs 来将建立 WebSocket 连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// 将 HTTP 连接转换为 WebSocket 连接
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// 客户端
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
// 注册到 hub
client.hub.register <- client

// 发送数据到客户端的协程
go client.writePump()
// 从客户端接收数据的协程
go client.readPump()
}

serveWs 中,我们在跟客户端建立起连接后,创建了两个协程,一个是从客户端接收数据的,另一个是发送消息到客户端的。

这个 demo 的作用

这个 demo 是一个比较简单的 demo,不过也包含了我们构建 WebSocket 应用的一些关键处理逻辑,比如:

  • 使用 Hub 来维持一个低层次的连接信息
  • Client 中区分读和写的协程
  • 以及一些边界情况的处理:比如连接断开、超时等

在后续的文章中,我们会基于这些已有知识去构建一个更加完善的 WebSocket 应用,今天就到此为止了。

在本系列文章中,将会使用在 Go 中一个用得比较多的 WebSocket 实现 gorilla/websocket

背景知识 - HTTP 与 WebSocket 的关系

本文会涉及到一些原理讲解,其中比较关键的一个是 HTTP 与 WebSocket 的联系与区别,了解这个可以帮助我们更好地使用 WebSocket

如果我们此前已经使用过 WebSocket,比如在 nginx 配置过 WebSocket,我们就会发现:

  1. 有个类似 upgrade 的关键字。这个关键字体现了 HTTP 与 WebSocket 的本质区别。
  2. 在 nginx 里配置,意味着 WebSocket 本质上也是通过 HTTP 协议来工作的。

我们知道,HTTP 的请求会在请求结束之后断开 TCP 连接,但 WebSocket 不一样,它在建立连接之后会一直维持着连接状态, 这样客户端与服务端就可以一直维持通信状态了。

WebSocket 建立连接的过程

在 WebSocket 协议中,初始的握手阶段使用标准的 HTTP 请求和响应:

  1. 客户端先发送一个 HTTP 请求,请求升级到 WebSocket 协议。
  2. 服务器在收到这个请求后,如果同意升级到 WebSocket,就会返回一个状态码为 101 的 HTTP 响应,指示升级成功,然后不会断开 TCP 连接。

这个过程涉及到的 HTTP 头部字段是 UpgradeConnection,具体而言,HTTP 请求头部可能包含类似以下的字段:

请求:

1
2
3
4
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade

响应:

1
2
3
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade

也就是说,我们所看到的 Upgrade 实际上是把一个 HTTP 连接升级为了 WebSocket 连接,这个连接可以实现双向的通信。

这使得它非常适合实时通信的应用,例如聊天应用、在线游戏等。

gorilla/websocket 中的基本概念

WebSocket 连接 - Conn

gorilla/websocket 中使用 Conn 来表示一个 WebSocket 连接,它主要有如下作用:

  • 发送消息给客户端:Write* 方法,如 WriteJSON 发送 JSON 类型消息,又或者 WriteMessage 可以发送普通的文本消息。
  • 接收客户端发送的消息:Read* 方法,如 ReadJSONReadMessage
  • 其他功能:关闭连接、获取客户端 IP 地址等

消息

gorilla/websocket 中,消息被分为以下几种:

  • 数据消息:
    • TextMessage 文本消息:文本消息被解析为 UTF-8 编码的文本。需要应用程序来确保文本消息是有效的 UTF-8 编码文本。
    • BinaryMessage 二进制消息:二进制消息的解析留给应用程序。
  • 控制消息:可以调用 Conn 中的 WriteControlWriteMessageNextWriter 方法,将控制消息发送给对方。
    • CloseMessage 关闭连接的消息
    • PingMessage ping 消息
    • PongMessage pong 消息

注意:应用程序需要先读取连接中的消息才能处理从对等方发送的 closepingpong 消息。如果应用程序对来自对等方的消息不感兴趣, 则应用程序应启动一个 goroutine 来读取和丢弃来自对等方的消息。

并发

虽然 Golang 中有 goroutine 可以支持我们做并发操作,但是在 gorilla/websocket 中, 一个 WebSocket 连接只支持一个并发 reader 和一个并发 writer

我们的应用程序应该确保不超过一个 goroutine 同时调用写入方法(WriteMessageWriteJSON)或者读取方法(ReadMessageReadJSON)。

CloseWriteControl 方法可以与其他所有方法同时调用。

安全性

我们知道,在一般的 web 应用中,经常需要处理跨域的问题,同样的,在 gorilla/websocket 中也需要做一定的配置。

我们可以在 Upgrader 中的 CheckOrigin 字段中指定函数的 Origin 检查策略,如果 CheckOrigin 函数返回 false,则 Upgrader 方法将拒绝建立 WebSocket 连接,如果允许所有来源的连接,我们可以直接返回 true 即可。

1
2
3
4
5
6
7
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

缓冲

缓冲在 io 类操作中是一个很常见的术语,在 gorilla/websocket 中我们可以通过上面那段代码的 ReadBufferSizeWriteBufferSize 来指定连接的缓冲大小,以减少读取或写入消息时的系统调用次数。

默认大小为 4096,建议限制为最大预期消息的大小,大于最大消息最大大小的缓冲区不会带来任何好处。

Hello World

最后,让我们通过一个简单的 Hello World 程序来结束本文:

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"github.com/gorilla/websocket"
"log"
"net/http"
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func handler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatal(err)
}

conn.WriteMessage(websocket.TextMessage, []byte("Hello, World!"))
conn.Close()
}

func main() {
http.HandleFunc("/ws", handler)
http.ListenAndServe(":8181", nil)
}

执行 go run main.go 启动 WebSocket 服务端,然后,我们打开一个浏览器的控制台, 在里面执行下面的 JavaScript 代码:

1
let ws = new WebSocket('ws://127.0.0.1:8181/ws')

不出意外的话,我们可以在浏览器控制台的 Network -> WS 中看到由服务端发送的 Hello, World!

什么是 xhprof?

xhprof 是一个轻量级 PHP 性能分析工具。

它报告函数级别的请求次数和各种指标,包括阻塞时间,CPU 时间和内存使用情况。

注意:xhprof 的使用开销很大,所以只能在本地开发调试的时候使用。

安装

我们可以通过 pecl 来安装 xhprof

1
2
# 目前最新版本是 2.3.9
pecl install xhprof-2.3.9

安装完之后,运行一下 php -m 查看是否已经启用:

1
php -m

最后,修改一下 php.ini 配置文件,添加以下配置:

1
xhprof.output_dir = /tmp/xhprof

注意:这里指定的文件夹必须有写的权限才行。

当然,我们也可以通过源码编译安装,源码在 https://github.com/longxinH/xhprof

使用

主要有两个步骤:

  1. 使用 xhprof_enable 来开启 xhprof 性能监控
1
2
3
xhprof_enable(XHPROF_FLAGS_NO_BUILTINS +
XHPROF_FLAGS_CPU +
XHPROF_FLAGS_MEMORY);
  1. 注册一个 shutdown 处理器

它的作用是在 php 请求处理完毕的时候将性能指标数据写入到文件中,如果没有这个,则在 xhprof.output_dir 中将不会有任何输出。

1
2
3
4
5
6
7
8
9
register_shutdown_function(function(){
$data = xhprof_disable(); //返回运行数据
// 需要在 https://github.com/longxinH/xhprof 下载源码,下面的 `.../xhprof_lib` 就是下载源码中的 `xhprof_lib` 目录
// 这里需要替换为自己本地的路径
include '/Users/ruby/Code/xhprof-2.3.9/xhprof_lib/utils/xhprof_lib.php';
include '/Users/ruby/Code/xhprof-2.3.9/xhprof_lib/utils/xhprof_runs.php';
$objXhprofRun = new XHProfRuns_Default();
$objXhprofRun->save_run($data, "test"); //test 表示文件后缀
});

通过 HTML 页面展示性能分析结果

  1. 我们在本地的 nginx 中添加一个 xhprof web 服务的配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
server {
listen 8088;
server_name xhprof.local;
root /Users/ruby/Code/xhprof-2.3.9;

add_header X-Frame-Options "SAMEORIGIN";
add_header X-Content-Type-Options "nosniff";

index index.php;

charset utf-8;

location / {
try_files $uri $uri/ /index.php?$query_string;
}

location = /favicon.ico { access_log off; log_not_found off; }
location = /robots.txt { access_log off; log_not_found off; }

error_page 404 /index.php;

location ~ \.php$ {
fastcgi_pass 127.0.0.1:9000;
fastcgi_param SCRIPT_FILENAME $realpath_root$fastcgi_script_name;
include fastcgi_params;
}

location ~ /\.(?!well-known).* {
deny all;
}
}
  1. 重启本地的 php-fpm

  2. 在我们的 /etc/hosts 中加上 nginx 中配置的域名即可

  3. 打开浏览器,访问 http://xhprof.local:8088/xhprof_html/

xhprof
  1. 点击其中一个进去,可以看到详情
xhprof

在这里,我们还能点击每一列的表头,让它按这一列来排序。上图我就按 Calls 逆序排序了。

以图的形式来展示

上面我们通过表格的方式来看到了函数调用的次数、时间等,但表格其实不够直观。

我们也看到上面图中的正中间有一个 View Full Callgraph 的超链接,我们可以通过这个超链接来查看具体的函数调用链, 这样我们可以更加直观的知道调用入口在哪里,以及整个调用链条大概长什么样子的。

xhprof

图太大了,这里随便看看吧

注意:要使用这个功能,我们需要安一个插件 graphviz。mac 下可以通过 brew install graphviz,其他的自行搜索。

最近在项目中发现有个接口需要耗时几分钟才能完成,排查发现跟以往的慢请求不大一样,这次的慢请求中,并没有慢查询(不管是 MySQL、MongoDB 还是 Redis、HTTP),经过排查发现其中有一个函数处理时间非常长,整个请求的 99% 的时间都花在了这个函数上:

1
2
3
4
5
6
// 函数内主要是一个 for 循环
foreach ($rules as $rule) {
if (empty($this->hasAllCustomerUserIds[$rule->user_id])) {
$this->matchRule($rule, $results);
}
}

上面这个循环有 600+ 次,但是循环内的 matchRule11w+ 个循环,也就是说,总循环次数达到了 6000w+ 次。 这样一来,就算我们没有慢查询,某些性能不高的点累积起来也会导致整个请求耗时非常漫长,以至最终耗时达到了 7 分钟。

根本原因

在这 6000w+ 的循环里面,完全没有太复杂的操作,更没有什么查询之类的操作,但是由于内层循环都是操作了 ORM 模型,有比较多的获取 ORM 模型属性的操作。 而 Laravel 的 ORM 模型中获取属性是通过魔术方法 __get() 实现的,而这个 __get() 中有一个时间复杂度比较高的操作 getAttribute(),所以这里会有一定的性能问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Dynamically retrieve attributes on the model.
*
* @param string $key
* @return mixed
*/
public function __get($key)
{
return $this->getAttribute($key);
}

/**
* Get an attribute from the model.
*
* @param string $key
* @return mixed
*/
public function getAttribute($key)
{
if (! $key) {
return;
}

// If the attribute exists in the attribute array or has a "get" mutator we will
// get the attribute's value. Otherwise, we will proceed as if the developers
// are asking for a relationship's value. This covers both types of values.
if (array_key_exists($key, $this->attributes) ||
$this->hasGetMutator($key)) {
return $this->getAttributeValue($key);
}

// Here we will determine if the model base class itself contains this given key
// since we don't want to treat any of those methods as relationships because
// they are all intended as helper methods and none of these are relations.
if (method_exists(self::class, $key)) {
return;
}

return $this->getRelationValue($key);
}

这里只贴 __get()getAttribute() 方法,但是我们也能看到了,虽然我们只是做了一个简单的获取模型属性的操作,但底层涉及了很多的方法调用。

在这种情况下,虽然在代码中已经做了一些查询上的优化,但是这个计算规模下,对 Laravel ORM 模型的操作带来的性能问题会非常显著。

因为模型中使用 $model->attribute 这种方式来获取它的属性的时候,时间复杂度会很高(相比于普通的对象属性),下面是一个性能测试:

100w 次模型访问属性操作

1
2
3
4
5
6
7
8
9
10
11
12
13
class Foo extends \Illuminate\Database\Eloquent\Model {
}

$start = microtime(true);

$model = new Foo;
$model->a = 1;
for ($i = 0; $i < 1000000; $i++) {
$model->a;
}

// 0.7896(=789.6ms)
dump(bcsub(microtime(true), $start, 4));

最终耗时 789.6ms

100w 次普通对象获取属性操作

1
2
3
4
5
6
7
8
9
10
11
12
13
class Bar {
}
$bar = new Bar;
$bar->a = 1;

$start = microtime(true);

for ($i = 0; $i < 1000000; $i++) {
$bar->a;
}

// 0.0140(=14ms)
dump(bcsub(microtime(true), $start, 4));

最终耗时 14ms

也就是说,两者在 100w 的计算规模下,性能差距相差了 56.4 倍。

56.4 倍在数据规模小的时候,我们通常无法感知,比如 10ms,再乘以 56.4 也就是 564,这也还是一个可以接受的时间。

但是在数据规模较大的时候,原本只需要 1s 的操作,可能就得需要 56s 了,这就是非常明显的。

ORM 模型的属性访问做了什么?

首选我们需要知道的是,我们定义的模型中,并没有定义显式地定义任何 public 属性,对于这种情况,我们访问它的属性的时候,php 会去调用对象的 __get 方法。那就顺着模型的 __get 方法看看它做了什么:

orm_optimize

我们可以看到,我们只是做了一个简单的属性访问,但是底层却调用了 8+ 方法。

方法调用在次数少的时候我们无法感知,但是我们可以看看 100w 次方法调用需要多久:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Test {
public function t()
{}
}

$start = microtime(true);

$test = new Test();
for ($i = 0; $i < 1000000; $i++) {
$test->t();
}

// 0.0216(=21.6ms)
dump(bcsub(microtime(true), $start, 4));

上面的代码中,我们的 t 方法里面什么都没有做,但是调用了 100w 次之依然需要 21ms

也就是说,在模型访问属性所产生的 8+ 方法调用中,至少需要花费 160ms,这还是一个保守的数字,因为上面产生的方法调用里面还有一些方法调用没有画出来。

如何优化?

知道了原因之后,我们优化起来就简单了,那就是尽量把 ORM 模型访问属性所产生的额外开销去掉,因为在这里讨论的问题中,并不需要模型帮我们做额外的处理(比如 cast、日期转换)。

因此,最简单的实现方法就是将 ORM 模型转换为普通的实体类型。

这一种优化方法下,也还有两种实现方式,先说第一种,直接创建一个 stdClass 对象,然后将 ORM 模型的属性依次赋值到这个 stdClass 对象中,如下:

在我们定义的模型中添加一个方法 toStdClass:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Foo extends \Illuminate\Database\Eloquent\Model {
// 转换为 stdClass
public function toStdClass(): stdClass
{
$result = new stdClass();

foreach ($this->attributes as $key => $attribute) {
$result->{$key} = $attribute;
}

return $result;
}
}

$start = microtime(true);

$model = new Foo;
$model->a = 2;

$stdClass = $model->toStdClass();

for ($i = 0; $i < 1000000; $i++) {
$stdClass->a;
}

// 0.0140(=14ms)
dump(bcsub(microtime(true), $start, 4));

在这种实现方式中,由于我们在循环里面操作的是一个 stdClass,所以也就没有了 ORM 模型访问属性时候的方法调用开销,最终时间是 14ms,也就是跟我们访问普通对象属性的开销一样了,比原来快了 56 倍。

但是,需要注意的是,在这种实现中,由于我们拿到的是一个 stdClass,也就丧失了 ORM 模型本身带来的一些便利。不过好在这种便利在我们只是做简单的属性访问的时候是不需要的,除非我们需要在拿到模型之后还需要做 CRUD 操作。就算如此,我们也依然可以再改造一下我们的实现方式,另外定义一个实体类,使用这个实体类来代替上面的 stdClass,然后再在这个实体类中添加一个方法,让其拥有转换回 ORM 模型的能力(第 2 种实现方式):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Foo extends \Illuminate\Database\Eloquent\Model
{
public function toStdClass(): stdClass
{
$result = new stdClass();

foreach ($this->attributes as $key => $attribute) {
$result->{$key} = $attribute;
}

return $result;
}
}

class FooEntity
{
// 由 ORM 模型创建一个 Entity
public static function make(\Illuminate\Database\Eloquent\Model $model): self
{
$result = new self();

foreach ($model->getAttributes() as $key => $attribute) {
$result->{$key} = $attribute;
}

return $result;
}

// 由 Entity 转换回 ORM 模型
public function toModel(): Foo
{
$result = new Foo();

foreach (get_object_vars($this) as $key => $attribute) {
$result->setAttribute($key, $attribute);
}

return $result;
}
}

$model = new Foo;
$model->a = 2;

// model -> entity
$entity = FooEntity::make($model);
// entity -> model
$model = $entity->toModel();

优化效果

这里不贴具体代码了。

在不做其他优化的情况下,只是将循环中的 ORM 模型修改为实体之后,原本 7 分钟的函数,最终只需要 44s 了。

当然,代码还有不少其他可以优化的地方,只是那些是比较常规的可以优化的,这里不再赘述。截止发文这天,这个优化已经上线了,将其他的可以优化的地方优化之后,原本 7 分钟的请求,最终耗时 20s 左右。

存在问题

  1. 上面的两种实现方式都没有显式指定类的属性,不好维护。(可以显式定义实体类,并显式定义 ORM 中存在的属性)
  2. 转换为 entity 之后无法拥有 ORM 模型的能力,需要注意。

由于第 2 点,所以这种优化不适用于那些需要更新的场景,当然我们也可以将实体转换回 ORM 模型,然后再做 CURD 操作。

总结

Laravel 的 ORM 模型给我们带来了非常大的便利,但是它通过魔术方法 __get() 来获取模型属性的方式在大数据量操作下会有一些性能问题, 如果我们不需要在这过程做 CURD 操作,我们可以将 ORM 转换为简单的实体对象。

这样就可以大大减少在 ORM 模型中访问属性的开销。