Golang 搭建 WebSocket 应用(六) - 监控

我在上一篇文章中,提到了目前的认证方式存在一些问题,需要替换为一种更简单的认证方式。 但是最后发现,认证这个实在是没有办法简单化,认证本身又是另外一个不小的话题了,因此关于这一点先留个坑。

本文先讨论一下另外一个也比较重要的功能:监控。

为认证预留扩展点

虽然我们暂时不去实现更加完善的认证流程,但是我们依然可以先为其预留一个扩展点, 这样在未来我们要实现认证的时候,就不需要改动太多的代码了。

同样的,我们也可以基于 DIP 原则来实现,我们可以定义一个 Authenticator 接口:

1
2
3
4
type Authenticator interface {
// Authenticate 验证请求是否合法,第一个返回值为用户 id,第二个返回值为错误
Authenticate(r *http.Request) (string, error)
}

然后我们可以在 Hub 结构体中添加一个 authenticator 字段:

1
2
3
4
type Hub struct {
// 验证器
authenticator Authenticator
}

而对于我们目前的这种基于 jwt token 的认证方式,我们可以实现一个 JwtAuthenticator

1
2
3
4
5
6
7
8
9
var _ Authenticator = &JWTAuthenticator{}

type JWTAuthenticator struct {
}

func (J *JWTAuthenticator) Authenticate(r *http.Request) (string, error) {
jwt := NewJwt(r.FormValue("token"))
return jwt.Parse()
}

接着,我们在 newHub 中初始化这个 authenticator

1
2
3
4
5
6
func newHub() *Hub {
return &Hub{
// ... 其他代码 ...
authenticator: &JWTAuthenticator{},
}
}

这样,我们就可以在 serveWs 中使用这个 authenticator 了:

1
2
3
4
5
6
7
8
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
uid, err := hub.authenticator.Authenticate(r)
if err != nil {
log.Println(fmt.Errorf("jwt parse error: %w", err))
return
}
// ... 其他代码
}

在后面我们实现了更加完善的认证流程之后,我们只需要实现一个新的 Authenticator 即可。

2023 了,应用监控怎么做

发展到今天,我们已经有了很多很好用的监控相关的东西,比如 PrometheusGrafana, 以及一些分布式链路追踪的组件,如 skywalkingjaeger 等。

但是他们各自的应用场景都不太一样,并不存在一个万能的监控工具,因此我们需要根据自己的需求来选择:

  • Prometheus:Prometheus 是一个开源的系统监控和报警工具。主要用于收集、存储和查询系统的监控数据,以便进行性能分析、故障排除和告警。
  • Grafana:Grafana 是一个开源的数据可视化和监控平台,用于创建、查询、分析和可视化时间序列数据。目前比较常见的组合就是 Prometheus + Grafana,通过 Prometheus 收集数据,然后通过 Grafana 展示数据。
  • 分布式链路追踪:常用语分布式系统的调用链路追踪,可以用于分析系统的性能瓶颈,以及分析系统的调用链路。常见的实现有 skywalkingjaeger 等。

在我们这个实例中,我们只需要实现一个简单的监控即可,因此我们可以使用 Prometheus + Grafana 的组合。

Prometheus 基本原理

但在此之前我们最好先了解一下 Prometheus 的工作原理,下面是来自 Prometheus 官网的架构图:

architecture

我们可以从两个角度来看这张图:组件、流程。

  1. 组件
  • Prometheus ServerPrometheus 服务端,主要负责数据的收集、存储、查询等。(上图中间部分)
  • AlertmanagerPrometheus 的告警组件,主要负责告警的发送。(上图右上角)
  • Prometheus web UI:可以在这个界面执行 PromQL,另外 Grafana 可以让我们以一种更直观的方式来查看指标数据(也是使用 PromQL)。(上图右下角)
  • exportersexportersPrometheus 的数据采集组件,主要负责从各个组件中采集数据,然后发送给 Prometheus Server。非常常见的如 node_exporter,也就是服务器基础指标的采集组件。除了 exporters,还有一种常见的数据采集方式是 Pushgateway,也就是将数据推送到 Pushgateway,然后由 Prometheus ServerPushgateway 中拉取数据。(也就是上图左边部分)
  1. 流程
  • 采集数据:也就是从 Pushgateway 或者 exporter 拉取一些指标数据。
  • 存储数据:Prometheus Server 会将采集到的数据存储到本地的 TSDB 中。
  • 查询数据:我们可以通过 web UI 或者 Grafana 来查看数据。

最后,我们可以在 Grafana 中看到如下图表:

grafana

通过这个图,我们就可以很直观的看到我们的系统的一些指标数据了,并且能看到这些指标随着时间的变化趋势。

Grafana 里面的图表都是一个个的 PromQL 查询出来的结果,对于常见的一些监控指标,Grafana 上可以找到很多现有的模板,直接使用即可。

Prometheus 采集的是什么数据

举一个简单的例子:对于一个运行中的系统而言,每一刻它的状态都是不太一样的,比如,可能上一秒 CPU 使用率是 10%,下一秒就变成了 100% 了, 但可能过 1 秒又降低到了 10%。当我们的系统出性能问题的时候,我们就需要去分析这些指标数据,找到问题所在。 比如排查一下出现性能问题的那个时间点,CPU 使用率是不是很高,如果是的话,那么就有可能是 CPU 导致的性能问题。

Prometheus 的作用就是帮助我们采集这些指标数据,然后存储起来,等待某天我们需要分析的时候,再去查询这些数据。 又或者监控到指标有异常的时候,可以通过 Alertmanager 来发送告警。

Prometheus 采集数据频率

Prometheus 采集数据的频率是可以配置的,我们一般配置为 1 分钟采集一次。 也就是说,每隔 1 分钟,Prometheus 才会从 exporter 拉取一次数据,然后存储起来。

应用指标数据采集

对于我们的应用而言,往往也有一些指标可以帮助我们看到应用内部的状态,比如:应用内的线程数、应用占用的内存、应用的 QPS 等等。 但是对于应用指标的监控,并没有一个统一的标准,我们需要根据自己应用的实际情况来决定采集哪些指标。

我们的消息推送系统如何做监控

应用指标

对于我们的消息推送系统而言,目前采集以下这两个重要指即可:

  1. 连接数:可以了解服务器当前负载

连接数我们可以直接通过 len(hub.clients) 来获取,非常简单。

  1. 等待推送的消息数:可以了解服务器能否及时处理消息

我们可以在 Hub 中添加一个 pending atomic.Int64 字段来记录当前等待推送的消息数,然后在 send 方法中进行更新:

1
2
3
4
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
// ... 其他代码 ...
hub.pending.Add(1)
}

同时在处理完成之后,我们也需要将其减 1,所以 writePump 也需要进行修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *Client) writePump() {
for {
select {
case messageLog, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// ...
c.hub.pending.Add(int64(-1 * len(c.send)))
return
}

if err := c.conn.WriteMessage(websocket.TextMessage, []byte(messageLog.Message)); err != nil {
// ...
c.hub.pending.Add(int64(-1 * len(c.send)))
return
}
}
c.hub.pending.Add(int64(-1))
}
}

我们在 writePump 中有三个地方需要对 pending 字段做减法:连接关闭、发送出错、发送成功。

exporter 以及 Grafana 配置

现在我们知道了我们有两个比较关键的指标需要采集,那到底是如何采集的呢?

具体来说,会有以下两步:

  1. 在消息推送系统中添加一个 /metrics 接口

这个接口的作用就是将我们的指标数据暴露出来,以便 Prometheus 采集。 它返回的就是请求时的连接数和等待推送的消息数,返回的格式也有一定要求,但也不复杂,具体来说就是:

  • 一行一个指标
  • 可以返回多个指标,多行即可
  • 每个指标前一行指定其类型(TYPE
  • 每行的格式为:<指标名称>{<标签名称>=<标签值>, ...} <指标值>

下面是一个简单的例子:

1
2
3
4
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="GET", endpoint="/api"} 100
http_requests_total{method="POST", endpoint="/api"} 50

在这个示例中:

  • http_requests_total 是指标名称
  • {method="GET", endpoint="/api"} 是标签集合,用于唯一标识两个不同的时间序列。
  • 10050 是样本值,表示在特定时间点上的 HTTP 请求总数。

最终,我们得到了一个如下的 /metrics 接口:

1
2
3
4
5
6
func metrics(hub *Hub, w http.ResponseWriter, r *http.Request) {
var pending = hub.pending.Load()
var connections = len(hub.clients)
w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

不要忘记了在 main 中加上一个入口:

1
2
3
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metrics(hub, w, r)
})

最终,这个接口会返回如下的数据:

1
2
3
4
5
6
# HELP connections 连接数
# TYPE connections gauge
connections 0
# HELP pending 等待发送的消息数量
# TYPE pending gauge
pending 0
  1. Prometheus 中配置 exporter

我们需要在 Prometheus 配置文件中加上以下配置:

1
2
3
4
5
scrape_configs:
# 拉取我们的应用指标
- job_name: 'websocket'
static_configs:
- targets: ['192.168.2.107:8181']

注意:这里不需要在后面加上 /metrics,因为 Prometheus 默认就是去拉取 /metrics 接口的。

web UI

然后我们就可以在 Prometheusweb UI 中看到我们的指标数据了。

  1. Grafana 中配置图表

最后,我们可以在 Grafana 中配置一个图表,来展示我们的指标数据:

Grafana

这样,我们就可以看到一个等待发送的消息数量以及连接数的变化了。

总结

最后,再来简单回顾一下本文所讲内容,主要包括以下几个方面:

  • 认证方式是另外一个比较复杂的话题,但是我们依然可以为其预留出一个扩展点,先实现其他功能后再来完善。
  • 目前市面上有很多监控相关的组件,本文使用了 Prometheus 作为例子来演示如何在项目中采集应用的指标数据,以及如何通过 Grafana 来展示这些指标的变化。
  • Prometheus 中包含了 `Prometheus Serverexporters 等组件,其中 Server 是实际存储数据的地方,而 exporters 是用来采集指标数据的程序。
  • Prometheus 采集到的数据,我们可以通过 Grafana 来进行可视化展示,更加的直观。
  • 应用中,也可以暴露一个 /metrics 端口来返回应用当前的一些状态,只要遵循 Prometheus 的规范即可。