本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。
整体架构
再来回顾一下我们的整体架构:
在我们的 demo 中,包含了以下几种角色:
- 客户端:一般是浏览器,用于接收消息;
Hub
:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
- 调用
/send
发送消息的应用:用于将消息发送给
Hub
,然后由 Hub
将消息推送给客户端。
然后,每一个 WebSocket
连接都有一个关联的读协程和写协程,
用于读取客户端发送的消息,以及将消息推送给客户端。
目录结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ├── LICENSE // 协议 ├── Makefile // 一些常用的命令 ├── README.md ├── authenticator.go // 认证器 ├── authenticator_test.go // 认证器测试 ├── bytes.go // 字符串和 []byte 之间转换的辅助方法 ├── client.go // WebSocket 客户端 ├── go.mod // 项目依赖 ├── go.sum // 项目依赖 ├── hub.go // 消息中心 ├── main.go // 程序入口 ├── message // 消息记录器 │ ├── db_logger.go │ ├── db_logger_test.go │ ├── log.go │ └── stdout_logger.go ├── server.go // HTTP 服务 └── server_test.go // HTTP 接口的测试
|
运行
注:需要 Go 1.20 或以上版本
- 下载依赖:
可以使用七牛云的代理加速下载。
- 启动
WebSocket
服务端:
Hub 代码
最终,我们的 Hub
代码演进成了下面这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| const bufferSize = 128
type Handler func(log message.Log, err error)
type Hub struct { register chan *Client unregister chan *Client userClients map[string]*Client sync.RWMutex messageLogger message.Logger errorHandler Handler authenticator Authenticator pending atomic.Int64 }
func defaultErrorHandler(log message.Log, err error) { res, _ := json.Marshal(log) fmt.Printf("send message: %s, error: %s\n", string(res), err.Error()) }
func newHub() *Hub { return &Hub{ register: make(chan *Client), unregister: make(chan *Client), userClients: make(map[string]*Client, bufferSize), RWMutex: sync.RWMutex{}, messageLogger: &message.StdoutMessageLogger{}, errorHandler: defaultErrorHandler, authenticator: &JWTAuthenticator{}, } }
func (h *Hub) run() { for { select { case client := <-h.register: h.Lock() h.userClients[client.uid] = client h.Unlock() case client := <-h.unregister: h.Lock() close(client.send) delete(h.userClients, client.uid) h.Unlock() } } }
func metrics(hub *Hub, w http.ResponseWriter) { pending := hub.pending.Load() connections := len(hub.userClients) _, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections))) _, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending))) }
|
其中:
Hub
中的 register
和
unregister
通道用于处理客户端的注册和取消注册请求;
Hub
中的 userClients
用于记录
uid
和 Client
的对应关系;
Hub
中的 messageLogger
用于记录消息;
Hub
中的 errorHandler
用于处理错误;
Hub
中的 authenticator
用于验证客户端的身份;
Hub
中的 pending
用于记录等待发送的消息数量。
目前实现存在的问题:
register
和 unregister
通道被消费的时候需要加锁,这样会导致 register
和
unregister
变成串行的,性能不好;
userClients
也是需要加锁的,这样会导致
userClients
的读写也是串行的,性能不好;
对于这两个问题,前面我们讨论过,一种可行的办法分段
map
,然后对每一个 map
都有一个对应的
sync.Mutex
互斥锁来保证其读写的安全。
Client 代码
Client
比较关键的方法是:
writePump
:负责将消息推送给客户端。
serveWs
:处理 WebSocket
连接请求。
send
:处理消息发送请求。
writePump
这个方法会从 send
通道中获取消息,然后推送给客户端。
推送失败会调用 errorHandler
处理错误。 推送成功会将
pending
减一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
func (c *Client) writePump() { defer func() { _ = c.conn.Close() }()
for { messageLog, ok := <-c.send
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.hub.pending.Add(int64(-1 * len(c.send))) return }
if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil { c.hub.errorHandler(messageLog, err) c.hub.pending.Add(int64(-1 * len(c.send))) return }
c.hub.pending.Add(int64(-1)) } }
|
serveWs
serveWs
方法会处理 WebSocket
连接请求,然后将其注册到 Hub
中。
在连接的时候会对客户端进行认证,认证失败会断开连接。
最后会启动读写协程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error()))) return }
uid, err := hub.authenticator.Authenticate(r) if err != nil { _ = conn.SetWriteDeadline(time.Now().Add(time.Second)) _ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error()))) _ = conn.Close() return }
client := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid} client.conn.SetCloseHandler(closeHandler) client.hub.register <- client
go client.writePump() go client.readPump() }
|
send
send
是一个 http
接口,用于处理消息发送请求。 它会从 Hub
中获取
uid
对应的
Client
,然后将消息发送给客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func send(hub *Hub, w http.ResponseWriter, r *http.Request) { uid := r.FormValue("uid") if uid == "" { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte("uid is required")) return }
hub.RLock() client, ok := hub.userClients[uid] hub.RUnlock() if !ok { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid))) return }
messageLog := message.Log{Uid: uid, Message: r.FormValue("message")} _ = hub.messageLogger.Log(messageLog)
client.send <- messageLog
hub.pending.Add(int64(1)) }
|
github
完整代码可以在 github
上进行查看:https://github.com/eleven26/go-pusher