Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心
有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket
的使用有一定的了解了, 今天我们以一个更加真实的例子来学习如何在 Golang
中使用 WebSocket
。
需求背景
在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。
对于这种场景,我们可以使用 WebSocket
来实现。其他可以使用 WebSocket
进行通知的场景还有像管理后台一些通知(比如新订单通知)等。
在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:
- 可以给特定的用户推送:建立连接的时候,就建立起
WebSocket
连接与用户 ID 之间的关联 - 断开连接的时候,移除
WebSocket
连接与用户的关联,并且关闭这个WebSocket
连接 - 业务系统可以通过 HTTP 接口来给特定的用户推送
WebSocket
消息:只要传递用户 ID 以及需要推送的消息即可
基础框架
下面是一个最简单版本的框架图:
它包含如下几个角色:
Client
客户端,也就是实际中接收消息通知的浏览器Server
服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持WebSocket
连接,推送消息到特定的WebSocket
连接- 业务逻辑:这个实际上不属于 demo 的一部分,但是
Server
推送的数据是来自业务逻辑处理的结果
设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket
推送中心的技术上的变动也不会影响到实际的业务。
开始开发
一些结构体变动
Client
结构体的变化
1 | type Client struct { |
因为我们需要建立起 WebSocket
连接与用户之间的关联,因此我们需要一个额外的字段来记录用户
ID,也就是上面的 uid
字段。
这个字段会在客户端建立连接后写入。
Hub
结构体的变化
1 | type Hub struct { |
- 因为我们不再需要做广播,所以会移除
Hub
中的broadcast
字段。
取而代之的是,我们会直接在消息推送接口中写入到 uid
对应的 Client
的 send
通道。 当然我们也可以在
Hub
中另外加一个字段来记录要推送给不同 uid
的消息,但是我们的 Hub
的 run
方法是一个协程处理的,当需要推送的数据较多或者其中有
网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下
run
方法,启动多个协程来处理,不过这样比较复杂,本文会在
writePump
中处理。 (也就是建立 WebSocket
连接时的那个写操作协程)
- 同时为了更加快速地通过
uid
来获取对应的WebSocket
连接,新增了一个userClients
字段。
这是一个 map
类型的字段,key
是
uid
,值是对应的 Client
指针。
- 最后新增了一个
Mutex
互斥锁
因为,在用户实际进行登录的时候需要写入 userClients
字段,而这是一个 map
类型字段,并不支持并发读写。
如果我们在接受并发连接的时候同时修改 userClients
的时候会导致 panic
,因此我们使用了一个互斥锁来保证
userClients
的读写安全。
同时,clients
也是一个
map
,但上一篇文章中没有使用 sync.Mutex
来保护它的读写,在并发操作的时候也是会有问题的, 所以 Mutex
同时也需要保护 clients
的读写。
1 | func (h *Hub) run() { |
最后,我们会在 Hub
的 run
方法中写
userClients
或者 clients
字段的时候,先获取锁,写成功的时候释放锁。
建立连接
在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:
1 | // 将 HTTP 转换为 WebSocket 连接的 Upgrader |
接收消息
由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:
1 | func (c *Client) readPump() { |
在本文中,假设客户端的登录消息格式为 {"uid": "123456"}
这种 json
格式。
在这里也操作了
userClients
字段,同样需要使用互斥锁来保证操作的安全性。
发送消息
- 在我们的系统中,可以提供一个 HTTP 接口来跟业务系统进行交互:
1 | // 发送消息的接口 |
- 实际发送消息的操作
在 writePump
方法中,我们会将从 /send
接收到的数据发送给对应的用户:
1 | // 发送消息的协程 |
在这个方法中,我们会从 c.send
这个 chan
中获取需要发送给客户端的消息,然后进行发送操作。
测试
- 启动
main
程序
1 | go run main.go |
- 打开一个浏览器的控制台,执行以下代码
1 | ws = new WebSocket('ws://127.0.0.1:8181/ws') |
这两行代码的作用是与 WebSocket
服务器建立连接,然后发送一个登录信息。
然后我们打开控制台的 Network -> WS -> Message
就可以看到浏览器发给服务端的消息:
- 使用 HTTP 客户端发送消息给 uid 为 123 的用户
假设我们的
WebSocket
服务器绑定的端口为8181
打开终端,执行以下命令:
1 | curl "http://localhost:8181/send?uid=123&message=Hello%20World" |
然后我们可以在 Network -> WS -> Message
看到接收到了消息 Hello World
。
结束了
到此为止,我们已经实现了一个初步可工作的 WebSocket
应用,当然还有很多可以优化的地方, 比如:
- 错误处理
Hub
状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下Hub
的当前状态,比如当前连接数- 日志:出错的时候,日志可以帮助我们快速定位问题
这些功能会在后续继续完善,今天就到此为止了。