Golang 搭建 WebSocket 应用(二) - 基本群聊 demo

在上一篇文章中,我们已经了解了 gorilla/websocket 的一些基本概念和简单的用法。 接下来,我们通过一个再复杂一点的例子来了解它的实际用法。

功能

这个例子来自源码里面的 examples/chat,它包含了以下功能:

  1. 用户访问群聊页面的时候,可以发送消息给所有其他在聊天室内的用户(也就是同样打开群聊页面的用户)
  2. 所有的用户发送的消息,群聊中的所有用户都能收到(包括自己)

其基本效果如下:

chat

为了更好地理解 gorilla/websocket 的使用方式,下文在讲解的时候会去掉一些出于健壮性考虑而写的代码。

基本架构

这个 demo 的基本组件如下图:

arch
  1. Client:也就是连接到了服务端的客户端,可以有多个
  2. Hub:所有的客户端会保存到 Hub 中,同时所有的消息也会经过 Hub 来进行广播(也就是将消息发给所有连接到 Hub 的客户端)
broadcast

工作原理

Hub

Hub 的源码如下:

1
2
3
4
5
6
7
8
9
10
type Hub struct {
// 保存所有客户端
clients map[*Client]bool
// 需要广播的消息
broadcast chan []byte
// 等待连接的客户端
register chan *Client
// 等待断开的客户端
unregister chan *Client
}

Hub 的核心方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (h *Hub) run() {
for {
select {
case client := <-h.register:
// 从等待连接的客户端 chan 取一项,设置到 clients 中
h.clients[client] = true
case client := <-h.unregister:
// 断开连接:
// 1. 从 clients 移除
// 2. 关闭发送消息的 chan
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
// 发送广播消息给每一个客户端
for client := range h.clients {
select {
// 成功写入消息到客户端的 send 通道
case client.send <- message:
default:
// 发送失败则剔除这个客户端
close(client.send)
delete(h.clients, client)
}
}
}
}
}

这个例子中使用了 chan 来做同步,这可以提高 Hub 的并发处理速度,因为不需要等待 Hubrun 方法中其他 chan 的处理。

简单来说,Hub 做了如下操作:

  1. 维护所有的客户端连接:客户端连接、断开连接等
  2. 发送广播消息

Client

Client 的源码如下:

1
2
3
4
5
6
7
8
type Client struct {
// Hub 单例
hub *Hub
// 底层的 websocket 连接
conn *websocket.Conn
// 等待发送给客户端的消息
send chan []byte
}

它包含了如下字段:

  1. Hub 单例(我们的 demo 中只有一个聊天室)
  2. conn 底层的 WebSocket 连接
  3. send 通道,这里保存了等待发送给这个客户端的数据

Client 中,是通过 readPump 这个方法来从客户端接收消息的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *Client) readPump() {
defer func() {
// 连接断开、出错等:
// 会关闭连接,从 hub 移除这个连接
c.hub.unregister <- c
c.conn.Close()
}()
// ...
for {
// 接收消息
_, message, err := c.conn.ReadMessage()
if err != nil {
// ... 错误处理
break
}
// 消息处理,最终放入 broadcast,准备发给所有其他在线的客户端
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}

readPump 方法做的事情很简单,它就是接收消息,然后通过 Hubbroadcast 来发给所有在线的客户端。

而发送消息会稍微复杂一点,我们来看看 writePump 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *Client) writePump() {
defer func() {
// 连接断开、出错:关闭 WebSocket 连接
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
// 控制写超时时间
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// 连接已经被 hub 关闭了
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

// 获取用以发送消息的 Writer
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
// 发送消息
w.Write(message)

n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
// 将接收到的信息发送出去
w.Write(<-c.send)
}

// 调用 Close 的时候,消息会被发送出去
if err := w.Close(); err != nil {
return
}
}
}
}

虽然比读操作复杂了一点,但是也还是很好理解,它做的东西也不多:

  1. 获取用以发送消息的 Writer
  2. 获取从 hub 中接收到的其他客户端的消息,发送给当前这个客户端

具体是如何工作起来的?

  1. main 函数中创建 hub 实例
  2. 通过下面这个 serveWs 来将建立 WebSocket 连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// 将 HTTP 连接转换为 WebSocket 连接
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// 客户端
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
// 注册到 hub
client.hub.register <- client

// 发送数据到客户端的协程
go client.writePump()
// 从客户端接收数据的协程
go client.readPump()
}

serveWs 中,我们在跟客户端建立起连接后,创建了两个协程,一个是从客户端接收数据的,另一个是发送消息到客户端的。

这个 demo 的作用

这个 demo 是一个比较简单的 demo,不过也包含了我们构建 WebSocket 应用的一些关键处理逻辑,比如:

  • 使用 Hub 来维持一个低层次的连接信息
  • Client 中区分读和写的协程
  • 以及一些边界情况的处理:比如连接断开、超时等

在后续的文章中,我们会基于这些已有知识去构建一个更加完善的 WebSocket 应用,今天就到此为止了。