软件开发严重依赖调试技术,这对于有效处理性能问题至关重要。用户在遇到程序执行缓慢时会感到沮丧,这凸显了通过调试工具有效识别和解决潜在问题的重要性。

但是,由于软件的创建和实现过程中涉及庞大的代码库或复杂的系统,因此调试软件中的性能问题可能很困难。

在 Go 中,开发人员可以使用强大的内置工具来帮助诊断和修复性能问题。其中两个工具是 pproftrace 包。

pprof 包允许您分析和分析 Go 程序的执行,而该 trace 包允许您跟踪和可视化事件和 goroutine 活动。当这些工具一起使用时,可以帮我们快速定位 Go 程序中导致性能低下的代码。

了解性能问题

Go 程序或任何软件应用程序中的性能问题都会对用户体验产生重大影响。Go 程序中的性能问题可能由于多种原因而发生。在本节中,我们将介绍性能问题的一些最常见原因以及它们如何影响系统。

  • 低效算法:低效算法会对性能产生重大影响,尤其是在处理大型数据集时。这些算法会占用额外的 CPU 周期和内存资源,这可能会降低整个应用程序的速度。比如暴力搜索方法和无效的排序算法。
  • 阻塞操作:应用程序可能偶尔会等待 I/O 活动完成,例如在磁盘上读取或写入数据或连接到网络。在此过程中,可能会发生阻塞操作并导致执行延迟,从而导致性能下降。当应用程序被阻塞时,它无法执行其他有用的任务,从而导致整体性能下降。
  • 内存使用率过高:使用大量内存的 Go 应用可能会导致性能问题,尤其是在资源不足的系统上。如果应用程序消耗的内存多于系统的可用内存,则系统可能会开始交换到磁盘,从而大大降低应用程序的性能。如果应用程序不能有效地管理内存,从而导致内存泄漏和其他问题,也会发生这种情况。

goroutine 泄漏也会导致内存使用率过高。另外,一些中间价比如 Elasticsearch 等,则建议直接禁用 swap,因为 swap 会导致性能下降,取而代之的是给它足够大的内容。

性能低下的应用程序会导致用户体验差,从而导致用户流失。为了获得最佳体验,优化 Go 应用程序至关重要。

使用 pprof 诊断性能问题

pprof 是 Go 中的一个内置包,它为开发人员提供了一个分析工具,用于观测他们的 Go 程序如何使用 CPU 和内存。然后收集和分析来自此测量的数据。借助 pprof 软件包,开发人员可以轻松测量和识别消耗比正常情况更多的 CPU 内存的函数,以及分配最多内存的程序部分。

让我们假设一个转账 App 使用 Go,并且它具有允许用户使用二维码向朋友汇款的功能。更新该功能后,其开发人员注意到该应用程序的运行速度比平时慢得多,40% 的用户抱怨扫描二维码时延迟长达 15 秒,有时付款失败。为了正确分析问题,开发团队可以在用户扫描二维码时使用 pprof 生成 CPU 分析文件。通过分析文件,他们可能会发现哪些函数占用了过多的 CPU 内存或哪些算法效率低下。在发现问题并修复问题后,他们可以再次测试和使用 pprof ,以确保性能得到提高,体验更快、更无缝。

pprof 的 profile 类型

  1. CPU:用于分析程序的 CPU 使用情况。衡量函数如何消耗不同的 CPU 时间,从而更容易识别哪些函数消耗更多时间,这些就可能是潜在的瓶颈。
  2. Memory:用于分析程序的内存使用情况。衡量应用程序如何使用内存以及应用程序的哪些部分分配更多内存。
  3. Block(阻塞):显示程序阻塞的位置(例如 I/O 或同步原语),从而更容易识别并发低下的区域。
  4. Goroutine(协程):通过返回正在运行、阻塞和等待的状态的 Goroutine,可以轻松检测到并发低下的区域。
  5. Trace:捕获程序执行期间发生的事件的详细日志,例如 goroutine 创建和销毁、调度、网络活动和阻塞操作。它在详细分析应用程序的性能时非常有用。

分析 profile

我们下面以一个例子来讲解一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"math/rand"
"os"
"runtime/pprof"
)

func main() {
// 创建一个保存 CPU 分析结果的文件
f, err := os.Create("profile.prof")
if err != nil {
panic(err)
}
defer f.Close()

// 开始采集 CPU 性能指标
if err := pprof.StartCPUProfile(f); err != nil {
panic(err)
}
defer pprof.StopCPUProfile()

// 模拟耗 CPU 的操作
for i := 0; i < 1000000; i++ {
n := rand.Intn(100)
_ = square(n)
}
}

func square(n int) int {
return n * n
}

在上面的代码中:

  • main 函数生成一个介于 1 和 1000 之间的随机数,然后计算其平方根。
  • pprof.StartCPUProfile(f) 函数启动 CPU 分析,从而创建可以在以后分析的 profile 文件。
  • defer pprof.StopCPUProfile() 语句确保在程序结束时停止 CPU 分析,无论程序是正常终止还是由于错误。
  • 我们调用 rand.Intn(100) 1000000 次来模拟 CPU 密集型任务。

接下来,我们执行这个程序:

1
go run main.go

程序运行结束后,会生成一个名为 profile.pprof 的文件,这个文件包含了 CPU 分析的数据。我们可以使用 go tool pprof 命令来分析这个文件:

1
go tool pprof profile.prof

接下来,会输出如下内容,并进入了一个交互式的命令行:

1
2
3
4
5
Type: cpu
Time: Jan 15, 2024 at 5:17pm (CST)
Duration: 205.21ms, Total samples = 10ms ( 4.87%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof)

我们可以接着输入一些命令,来查看 profile 的数据:

比如,我们可以输入 top 来查看最耗 CPU 的函数:

1
2
3
4
5
6
7
8
(pprof) top
Showing nodes accounting for 10ms, 100% of 10ms total
flat flat% sum% cum cum%
10ms 100% 100% 10ms 100% math/rand.(*Rand).Intn
0 0% 100% 10ms 100% main.main
0 0% 100% 10ms 100% math/rand.Intn (inline)
0 0% 100% 10ms 100% runtime.main
(pprof)

分析内存

若要获取内存配置文件,请修改代码以使用函数 pprof.WriteHeapProfile() 将堆配置文件写入文件。在生成随机数并计算其平方后,您需要添加代码以将内存配置文件写入文件(mem.prof)。您还将添加一个 time.Sleep(5 * time.Second) 调用,以便有时间将内存配置文件写入文件。在下面找到代码的更新版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"fmt"
"math/rand"
"os"
"runtime/pprof"
"time"
)

func main() {
// 创建一个保存 CPU 分析结果的文件
cpuProfileFile, err := os.Create("cpu.prof")
if err != nil {
panic(err)
}
defer cpuProfileFile.Close()

// 开始采集 CPU 性能指标
if err := pprof.StartCPUProfile(cpuProfileFile); err != nil {
panic(err)
}
defer pprof.StopCPUProfile()

// 模拟耗 CPU 的操作
for i := 0; i < 10; i++ {
n := rand.Intn(100)
s := square(n)
fmt.Printf("%d^2 = %d\n", n, s)
}

// 创建一个保存内存分析结果的文件
memProfileFile, err := os.Create("mem.prof")
if err != nil {
panic(err)
}
defer memProfileFile.Close()

// 将内存分析结果写入文件
if err := pprof.WriteHeapProfile(memProfileFile); err != nil {
panic(err)
}
fmt.Println("Memory profile written to mem.prof")

time.Sleep(5 * time.Second)
}

func square(n int) int {
return n * n
}

输出:

1
2
3
4
5
6
7
8
9
10
11
31^2 = 961
83^2 = 6889
88^2 = 7744
86^2 = 7396
14^2 = 196
99^2 = 9801
42^2 = 1764
29^2 = 841
86^2 = 7396
86^2 = 7396
Memory profile written to mem.prof

运行 go run main.go 后,将生成一个 mem.prof 文件。在交互式 shell 中,键入 top 以分析程序的内存使用情况。若要显示此交互式 shell,请运行以下命令:

1
go tool pprof mem.prof

要按 CPU 使用率显示排名靠前的函数,请键入 top 命令:

1
2
3
4
5
6
7
8
9
10
11
➜ go tool pprof mem.prof    
Type: inuse_space
Time: Jan 15, 2024 at 5:22pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1.72MB, 100% of 1.72MB total
flat flat% sum% cum cum%
1.72MB 100% 100% 1.72MB 100% runtime/pprof.StartCPUProfile
0 0% 100% 1.72MB 100% main.main
0 0% 100% 1.72MB 100% runtime.main
(pprof)

从上面的示例中可以看出,pprof 可以让我们很清楚地知道哪些函数占用了大量的内存或者 CPU。因此,通过将 profile 纳入开发过程,可以很容易地主动识别和解决性能问题,从而实现更快、更高效的应用程序。

在第一个示例中,我们了解了如何使用 pprof 工具创建 CPU 分析文件并对其进行分析。输出显示每个函数的调用次数,以及执行每个函数所花费的总时间。这使我们能够识别消耗最多 CPU 时间的函数,并可能对其进行优化。在第二个示例中,输出显示了每个函数的内存使用情况,包括分配的数量和分配的字节数。这使我们能够识别使用过多内存的函数,并可能对其进行优化以减少内存使用。

使用 trace 追踪

有时,我们需要有关程序如何运行的更多详细信息。在这种情况下,trace 包是一个非常强大和有用的工具。在本节中,我们将对其进行介绍。

trace 是一种工具,可让您收集有关程序运行方式的详细信息。它对于理解 goroutine 是如何创建和调度的、通道的使用方式以及网络请求的处理方式等内容非常有用。它提供了程序执行的时间线视图,可用于识别一段时间内的性能问题和其他类型的错误。

trace 可以收集有关程序运行时发生的各种事件的数据。这些事件包括:Goroutine 创建、销毁、阻塞、取消阻塞、网络活动和垃圾回收。每个 trace 事件都分配了一个时间戳和一个 goroutine ID,允许您查看事件的顺序以及它们之间的关系。

分析 trace 追踪数据

首先,我们将创建一个新的 go 文件,将其命名为 trace.go。若要生成跟踪数据,请导入 runtime/trace 包并在程序开始时调用 trace.Start 。若要停止跟踪收集,请在程序结束时调用 trace.Stop 。下面是它的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"fmt"
"math/rand"
"os"
"runtime/pprof"
"runtime/trace"
)

func main() {
// 创建一个保存 CPU 分析结果的文件
f, err := os.Create("profile.prof")
if err != nil {
panic(err)
}
defer f.Close()

// 开始采集 CPU 性能指标
if err := pprof.StartCPUProfile(f); err != nil {
panic(err)
}
defer pprof.StopCPUProfile()

// 创建一个保存 trace 追踪结果的文件
traceFile, err := os.Create("trace.out")
if err != nil {
panic(err)
}
defer traceFile.Close()

if err := trace.Start(traceFile); err != nil {
panic(err)
}
defer trace.Stop()

// 模拟耗 CPU 的操作
for i := 0; i < 10; i++ {
n := rand.Intn(100)
_ = square(n)
}
}

func square(n int) int {
return n * n
}

运行以下命令以启动程序:

1
go run main.go

要分析跟踪数据,可以使用 go tool trace 命令,后跟跟踪文件的名称:

1
go tool trace trace.out 

这将启动基于 Web 的跟踪数据可视化,您可以使用它来了解程序的运行方式并识别性能问题。

您还可以查看有关各种 goroutine 以及各种进程如何运行的详细信息!Trace 是了解各种流事件、goroutine 分析等等的绝佳工具!

分析和修复性能问题

使用 pproftrace 收集性能数据后,下一步是分析数据并确定可能的性能问题。

要解释 pprof 的输出,首先需要了解可用的各种类型的分析数据。最常见的配置文件类型是 CPU 和内存配置文件,就像前面引用的示例一样。通过分析这些配置文件,可以识别消耗大量资源并可能成为潜在瓶颈的功能。 pprof 还可以生成其他类型的配置文件,例如互斥锁争用和阻塞配置文件,这有助于确定同步和阻塞问题。例如,较高的互斥锁争用率可能表明多个 goroutine 正在争用同一个锁,这可能导致阻塞和性能不佳。

如前所述,跟踪数据包含有关应用程序行为的更全面的数据,例如 goroutines、阻塞操作和网络流量。跟踪数据分析可用于检测延迟源和其他性能问题,例如网络延迟过长或选择了效率低下的算法。

一旦确定了性能问题,有几种方法可以优化性能。一种常见的策略是通过重用对象来减少内存分配,同时减少大型数据结构的使用。通过减少可分配的内存量和垃圾回收量,可以降低 CPU 使用率并提高整体程序性能。

另一种方法是使用异步 I/O 或非阻塞操作来减少阻塞操作,例如文件 I/O 或网络通信。这有助于减少程序等待 I/O 操作完成所花费的时间,并提高整体程序吞吐量。

此外,优化算法和数据结构可以显著提高性能。通过选择更有效的算法和数据结构,可以减少完成操作所需的 CPU 时间,并提高整体程序性能。

总结

优化 Go 应用程序中的性能以确保它们高效且有效地运行非常重要。通过这样做,我们可以改善用户体验,降低运行应用程序的成本,并提高代码的整体质量。我们可以使用 pproftrace 工具来分析 CPU 和内存使用情况,并识别 Go 应用程序中的瓶颈和其他问题。然后,我们可以根据这些工具的输出对代码进行有针对性的改进,例如减少内存分配、最小化阻塞操作和优化算法。分析工具(如 pproftrace )对于识别和解决性能问题至关重要。

基本在所有的编程语言中,都有 map 这种数据结构,Go 语言也不例外。 我们知道 Go 是一门对并发支持得比较好的语言,但是 map 并不支持并发读写。 比如,下面这种写法是错误的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var m = make(map[int]int)
var wg sync.WaitGroup
wg.Add(2)
// 启动两个协程序同时写入 map
go func() {
for i := 0; i < 100; i++ {
m[i] = i
}
wg.Done()
}()
go func() {
for i := 0; i < 100; i++ {
m[i] = i
}
wg.Done()
}()
wg.Wait()

这样写会报错:

1
fatal error: concurrent map writes

为什么 Go map 不支持并发读写

这跟 map 的实现有关,根本原因是:map 的底层是支持自动扩容的,在添加元素的时候,如果发现容量不够,就会自动扩容。 如果允许扩容和访问操作同时发生,那么访问到的数据就不一定就是我们之前存放进去的了,所以 Go 从设计上就禁止了这种操作。 也就是 fail fast 的原则。

至于具体为什么,我们可以看看 map 在扩容时做了什么操作:

grow

上图来源于我之前写的一篇文章:go map 设计与实现

Go 中 map 的扩容是一个渐进的过程,在我们访问 map 的时候,会对 map 底层实际存储数据的桶进行迁移。

如果支持并发读写,就有可能会导致底层定位到的桶是扩容前的,但是实际上数据已经迁移到了新的桶中,这样就会导致访问到的并不是我们想要的数据。

Go map 设计上的考虑

在 Go 官网的博客上有专门针对 Go 不支持并发读写的说明,大概意思是:

经过长时间讨论,Go 团队认为,多数情况下,我们并不需要从多个 goroutine 来对 map 进行安全访问, map 可能是已经实现了同步的某些较大数据结构或计算的一部分。因此,如果底层再去实现 map 的互斥操作, 就会减慢大多数程序的速度,而只能增加少数程序的安全性。

也就是说,他们认为大多数情况下,map 通常是我们自定义数据结构的一部分,而对这个自定义数据结构的访问时,我们一般已经有了锁去保证并发读写安全了,所以没有必要再在底层的 map 上加锁,从而可以保证大多数程序的速度。

但是从语言层面上来说,我们依然可以自行通过互斥锁来实现 map 的的互斥访问。 仅当对 map 在进行更新的时候,map 的读才是不安全的,但是 map 是支持并发读的。

如何解决这个问题 - 互斥锁

关于这一点,同样可以在 Go 官方博客中找到相关的说明,在 Go map 并发这一节也给了对应的 demo。具体来说就是将一般锁跟 map 关联起来,要读写 map 的时候,得先获取这个锁才能访问,这样就避免了对 map 的并发读写了。这是最典型的一种解决方案,也是最简单的。

下面的结构体定义了一个匿名结构体 counter,这个结构体中包含了一个 sync.RWMutex 互斥锁和一个 map

1
2
3
4
var counter = struct{
sync.RWMutex
m map[string]int
}{m: make(map[string]int)}

读的时候,我们可以使用 RLock 获取读锁,然后访问 m 这个 map

1
2
3
4
counter.RLock()
n := counter.m["some_key"]
counter.RUnlock()
fmt.Println("some_key:", n)

RLock 是读锁,多个 goroutine 可以同时获取读锁,读锁释放之前,其他 goroutine 无法获取写锁。

写的时候,我们可以使用 Lock 获取写锁:

1
2
3
counter.Lock()
counter.m["some_key"]++
counter.Unlock()

Lock 是写锁,只有一个 goroutine 可以获取写锁,并且写锁释放之前,其他 goroutine 无法获取读锁,也无法获取写锁。

另一种解决方法 - sync.Map

除了使用互斥锁,我们也可以使用 Go 语言自带的 sync.Map 来解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var m sync.Map
var wg sync.WaitGroup
wg.Add(2)
go func() {
for i := 0; i < 100; i++ {
m.Store(i, i)
}
wg.Done()
}()
go func() {
for i := 0; i < 100; i++ {
m.Store(i, i)
}
wg.Done()
}()
wg.Wait()

虽然 sync.Map 可以实现并发的读写,但是底层上依然会有较多的竞态条件,所以性能上并不是最好的,本质上还是操作一个 map, 只是通过一些原子操作 + 自旋锁来实现并发安全的读写。

而且 sync.Map 设计出来的时候是为了应对一些特定的场景的,具体来说有以下两个场景:

  1. 当给定 key 的条目只写入一次但读取多次时,如在只会增长的缓存中。(读多写少)
  2. 当多个 goroutine 读取、写入和覆盖不相交的键集的条目。(不同 goroutine 操作不同的 key)

在这两种情况下,可以获得比用 Mutex + mapRWMutex + map 更好的性能,因为很多的锁操作都变成了原子操作。

具体细节可参考我此前的一篇文章:《深入理解 go sync.Map - 基本原理》

互斥锁、sync.Map 还不是最优的解决方案

使用互斥锁或者 sync.Map 的方式,虽然都可以解决 map 并发读写的问题,但是性能上都不是最优的。

因为它们底层还是会有互斥锁的竞争。这就意味着,在进行写 map 操作时,可能会存在较多的锁竞争,从而导致性能下降。

map 分片

如果我们有了解过 MongoDB,就会知道,MongoDB 中也有分片的概念,当数据量过大时, 单个 MongoDB 实例可能无法存储所有的数据,或者单个实例无法处理过多的读写请求, 这时候就需要将数据分片存储到多个 MongoDB 实例中,也就是按照一定的规则将数据存储到不同的机器上, 然后读写数据的请求也会依据一定规则被路由到对应的机器上。

同样的,如果我们的 map 并发请求过多,那么我们也可以将 map 分片, 将不同的 key 存储到不同的 map 中,这样就可以避免 map 的并发读写了。

我们需要做的是:通过 key 来计算其 hash 值,然后根据 hash 值来决定将 key 存储到哪个 map 中, 同时,我们每一个 map 都需要加上互斥锁,这样就可以保证每一个 map 的并发安全了。

具体如下图:

shard

说明:

  1. 图中的 G0~2 表示 goroutinelock0~2 表示不同的互斥锁,map shard 0~2 表示多个 map 分片。
  2. goroutine 中会根据 key 计算出 hash 值,然后根据 hash 值来决定将 key 存储到哪个 map 分片中,然后获取这个分片对应的锁,然后进行读写操作。

虽然上图画起来是 G1 不会访问到 shard 0 或者 shard 2,但实际上是有可能的,上图只是想说明: 多个 goroutine 可以多个锁来访问多个 map 分片,而不用像之前那样多个 goroutine 都来竞争同一把锁了。 也就减少了锁的竞争和等待

代码实现

具体实现已经有一个开源的库了:orcaman/concurrent-map, 可以在 github 上搜到。

下面是它的部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var SHARD_COUNT = 32


// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap[K comparable, V any] struct {
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
}

// A "thread" safe string to anything map.
type ConcurrentMapShared[K comparable, V any] struct {
items map[K]V
sync.RWMutex // Read Write mutex, guards access to internal map.
}

// GetShard returns shard under given key
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}

说明:

  1. SHARD_COUNT 是分片数量,也就是底层会有多少个 map 分片。
  2. ConcurrentMap 表示一个支持并发读写的 map,它底层包含了多个 map 分片,以及有一个根据 key 计算分片的函数。
  3. ConcurrentMapShared 表示一个 map 分片,也就是上面提到的 map + RWMutex 组合。
  4. GetShard 根据 key 获取对应的 map 分片。

单从代码的角度,它封装了更多的东西,性能相比单纯的 map + RWMutex 自然会差一点, 但是从并发读写的角度来说,它比单纯 map + RWutex 要好很多。 因为它将原本只支持一个协程写的 map 转换为了支持多个协程写操作的 map,一定程度上提高了并发

但是需要注意的是,我们需要频繁写同一个 key 的操作,上面这种分片的方式也不能带来性能上的提升。 分片的方式更适合那些 key 区分度高的、写操作频繁的场景。

总结

最后再简单回顾一下本文所讲内容:

  1. Go 的 map 设计上不支持并发读写,如果我们有并发读写操作会直接 panic
  2. Go 的设计者们认为,多数情况下,我们并不需要从多个 goroutine 来对 map 进行安全访问,所以他们没有在底层实现 map 的互斥操作。
  3. 有两种方法可以解决 map 并发读写的问题:互斥锁、sync.Map。但是 sync.Map 设计上是应对某些特定场景的,并不合适所有场景。
  4. 我们可以通过分片的方式来解决 map 并发读写的问题,这样可以减少锁的竞争,提高并发读写性能。目前已经有现成的开源库可以使用了。

本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。

整体架构

再来回顾一下我们的整体架构:

arch

在我们的 demo 中,包含了以下几种角色:

  1. 客户端:一般是浏览器,用于接收消息;
  2. Hub:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
  3. 调用 /send 发送消息的应用:用于将消息发送给 Hub,然后由 Hub 将消息推送给客户端。

然后,每一个 WebSocket 连接都有一个关联的读协程和写协程, 用于读取客户端发送的消息,以及将消息推送给客户端。

目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
├── LICENSE  // 协议
├── Makefile // 一些常用的命令
├── README.md
├── authenticator.go // 认证器
├── authenticator_test.go // 认证器测试
├── bytes.go // 字符串和 []byte 之间转换的辅助方法
├── client.go // WebSocket 客户端
├── go.mod // 项目依赖
├── go.sum // 项目依赖
├── hub.go // 消息中心
├── main.go // 程序入口
├── message // 消息记录器
│   ├── db_logger.go
│   ├── db_logger_test.go
│   ├── log.go
│   └── stdout_logger.go
├── server.go // HTTP 服务
└── server_test.go // HTTP 接口的测试

运行

注:需要 Go 1.20 或以上版本

  1. 下载依赖:

可以使用七牛云的代理加速下载。

1
go mod tidy
  1. 启动 WebSocket 服务端:
1
go run main.go

Hub 代码

最终,我们的 Hub 代码演进成了下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// bufferSize 通道缓冲区、map 初始化大小
const bufferSize = 128

// Handler 错误处理函数
type Handler func(log message.Log, err error)

// Hub 维护了所有的客户端连接
type Hub struct {
// 注册请求
register chan *Client
// 取消注册请求
unregister chan *Client
// 记录 uid 跟 client 的对应关系
userClients map[string]*Client
// 互斥锁,保护 userClients 以及 clients 的读写
sync.RWMutex
// 消息记录器
messageLogger message.Logger
// 错误处理器
errorHandler Handler
// 验证器
authenticator Authenticator
// 等待发送的消息数量
pending atomic.Int64
}

// 默认的错误处理器
func defaultErrorHandler(log message.Log, err error) {
res, _ := json.Marshal(log)
fmt.Printf("send message: %s, error: %s\n", string(res), err.Error())
}

func newHub() *Hub {
return &Hub{
register: make(chan *Client),
unregister: make(chan *Client),
userClients: make(map[string]*Client, bufferSize),
RWMutex: sync.RWMutex{},
messageLogger: &message.StdoutMessageLogger{},
errorHandler: defaultErrorHandler,
authenticator: &JWTAuthenticator{},
}
}

// 注册、取消注册请求处理
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.Lock()
h.userClients[client.uid] = client
h.Unlock()
case client := <-h.unregister:
h.Lock()
close(client.send)
delete(h.userClients, client.uid)
h.Unlock()
}
}
}

// 返回 Hub 的当前的关键指标
func metrics(hub *Hub, w http.ResponseWriter) {
pending := hub.pending.Load()
connections := len(hub.userClients)
_, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
_, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

其中:

  • Hub 中的 registerunregister 通道用于处理客户端的注册和取消注册请求;
  • Hub 中的 userClients 用于记录 uidClient 的对应关系;
  • Hub 中的 messageLogger 用于记录消息;
  • Hub 中的 errorHandler 用于处理错误;
  • Hub 中的 authenticator 用于验证客户端的身份;
  • Hub 中的 pending 用于记录等待发送的消息数量。

目前实现存在的问题:

  • registerunregister 通道被消费的时候需要加锁,这样会导致 registerunregister 变成串行的,性能不好;
  • userClients 也是需要加锁的,这样会导致 userClients 的读写也是串行的,性能不好;

对于这两个问题,前面我们讨论过,一种可行的办法分段 map,然后对每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

Client 代码

Client 比较关键的方法是:

  • writePump:负责将消息推送给客户端。
  • serveWs:处理 WebSocket 连接请求。
  • send:处理消息发送请求。

writePump

这个方法会从 send 通道中获取消息,然后推送给客户端。 推送失败会调用 errorHandler 处理错误。 推送成功会将 pending 减一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// writePump 负责推送消息给 WebSocket 客户端
//
// 该方法在一个独立的协程中运行,我们保证了每个连接只有一个 writer。
// Client 会从 send 请求中获取消息,然后在这个方法中推送给客户端。
func (c *Client) writePump() {
defer func() {
_ = c.conn.Close()
}()

// 从 send 通道中获取消息,然后推送给客户端
for {
messageLog, ok := <-c.send

// 设置写超时时间
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
// c.send 这个通道已经被关闭了
if !ok {
c.hub.pending.Add(int64(-1 * len(c.send)))
return
}

if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil {
c.hub.errorHandler(messageLog, err)
c.hub.pending.Add(int64(-1 * len(c.send)))
return
}

c.hub.pending.Add(int64(-1))
}
}

serveWs

serveWs 方法会处理 WebSocket 连接请求,然后将其注册到 Hub 中。 在连接的时候会对客户端进行认证,认证失败会断开连接。 最后会启动读写协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// serveWs 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// 升级为 WebSocket 连接
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error())))
return
}

// 认证失败的时候,返回错误信息,并断开连接
uid, err := hub.authenticator.Authenticate(r)
if err != nil {
_ = conn.SetWriteDeadline(time.Now().Add(time.Second))
_ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error())))
_ = conn.Close()
return
}

// 注册 Client
client := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid}
client.conn.SetCloseHandler(closeHandler)
// register 无缓冲,下面这一行会阻塞,直到 hub.run 中的 <-h.register 语句执行
// 这样可以保证 register 成功之后才会启动读写协程
client.hub.register <- client

// 启动读写协程
go client.writePump()
go client.readPump()
}

send

send 是一个 http 接口,用于处理消息发送请求。 它会从 Hub 中获取 uid 对应的 Client,然后将消息发送给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// send 处理消息发送请求
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
uid := r.FormValue("uid")
if uid == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("uid is required"))
return
}

// 从 hub 中获取 uid 关联的 client
hub.RLock()
client, ok := hub.userClients[uid]
hub.RUnlock()
if !ok {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid)))
return
}

// 记录消息
messageLog := message.Log{Uid: uid, Message: r.FormValue("message")}
_ = hub.messageLogger.Log(messageLog)

// 发送消息
client.send <- messageLog

// 增加等待发送的消息数量
hub.pending.Add(int64(1))
}

github

完整代码可以在 github 上进行查看:https://github.com/eleven26/go-pusher

在前面的文章中,提到过非功能性需求决定了架构。 今天我们再来考虑一下另外两个非功能性需求:性能和可用性。

前言

关于性能,其实并不是只有我们这个消息推送系统独有的问题。 对于所有的开发者而言,都多多少少会处理过性能相关的问题,比如后端为了减少数据库查询提高并发引入的缓存中间件,如 redis; 又或者如前端一次性渲染大量数据的时候,如果让用户体验更加流畅等。

本文会针对 WebSocket 应用场景下去思考一些可能出现的性能问题以及可行的解决方案。

性能

对于性能,有几个可能导致性能问题的地方:

连接数

连接数过多会导致占用的内存过多,因为对于每一个连接,我们都有两个协程,一个读协程,一个写协程; 同时我们的 Client 结构体中的 send 是一个缓冲通道,它的缓冲区大小也直接影响最终占用的内存大小。

比如,我们目前的创建 Client 实例的代码是下面这样的:

1
client := &Client{hub: hub, conn: conn, send: make(chan Log, 256), uid: uid}

我们在这里直接为 send 分配了 256 的大小,如果 Log 结构体比较大的话, 它占用的内存就会比较大了(因为最终占用内存 = 连接数 * sizeof(Log) * 256)。

在实际中,我们一般没有那么多等待发送的消息,这个其实可以设置为一个非常小的值,比如 16; 设置为一个小的值的负面影响是,当 send 塞满了 16 条 Log 的时候,发送消息的接口会阻塞:

1
2
3
4
5
6
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
// ... 其他代码
// 如果 send 满了,下面这一行会阻塞
client.send <- messageLog
hub.pending.Add(int64(1))
}

所以这个数值可能需要根据实际场景来选择一个更加合适的值。

代码本身的问题

比如,我们的代码中其实有一个很常见的性能问题,就是 string[]byte 之间直接强转:

1
2
3
4
// writePump 方法里面将 string 转 []byte
if err := c.conn.WriteMessage(websocket.TextMessage, []byte(messageLog.Message)); err != nil {
return
}

至于原因,可以去看看此前的一篇文章《深入理解 go unsafe》 的最后一小节, 简单来说,就是这个转换会产生内存分配,而内存分配会导致一定的性能损耗。而通过 unsafe 就可以实现无损的转换。

除了这个,其他地方也没啥太大的问题了,因为到目前为止,我们的代码还是非常的简单的。

互斥锁

为了保证程序的并发安全,我们在 Hub 中加了一个 sync.Mutex,也就是互斥锁。 在代码中,被 sync.MutexLock 保护的代码,在同一时刻只能有一个协程可以执行。

1
2
3
4
5
6
7
8
9
// 推送消息的接口
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
// ... 其他代码
// 从 hub 中获取 client
hub.Lock()
client, ok := hub.userClients[uid]
hub.Unlock()
// ... 其他代码
}

对于上面这种只读的操作,也就是没有对 map 进行写操作,我们依然使用了 sync.MutexLock() 来锁定临界区。 这里存在的问题是,其实我们的 hub.userClients 是支持并发读的,只是不能同时读写而已。

所以我们可以考虑将 sync.Mutex 替换为 sync.RWMutex,这样就可以实现并发读了:

1
2
3
4
5
6
7
8
9
// 推送消息的接口
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
// ... 其他代码
// 从 hub 中获取 client
hub.RLock() // 读锁
client, ok := hub.userClients[uid]
hub.RUnlock() // 释放读锁
// ... 其他代码
}

这样做的好处是,当有多个并发的 send 请求的时候,这些并发的 send 请求并不会相互阻塞; 而使用 sync.Mutex 的时候,并发的 send 请求是会相互阻塞的,也就是会导致 send 变成串行的,这样性能无疑会很差。

除此之外,我们在 Hubrun 方法中也使用了 sync.Mutex

1
2
3
4
5
case client := <-h.register:
h.Lock()
h.clients[client] = true
h.userClients[client.uid] = client
h.Unlock()

也就是说,我们将 Client 注册到 Hub 的操作也是串行的。 对于这种场景,其实也有一种解决方法就是分段 map, 也就是将 clientsuserClients 这两个 map 拆分为多个 map, 然后对于每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

但如果要这样做,单单分段还不够,我们的 registerunregister 还是只有一个,对于这个问题, 我们可能需要将 registerunregister 也分段,最后在 run 方法里面起多个协程来进行处理。 这个实现起来就很复杂了。

其他

由于我们的 Hub 中还有 MessageLogger、错误处理、认证等功能, 在实际中,如果我们有将其替换为自己的实现,可能还得考虑自己的实现中可能存在的性能问题:

1
2
3
4
5
type Hub struct {
messageLogger MessageLogger
errorHandler Handler
authenticator Authenticator
}

可用性

这里主要讨论的是集群部署的情况下,应用存在的一些的问题以及可行的解决方案。关于具体部署上的细节不讨论。

要实现高可用的话,我们就得加机器了,毕竟如果只有一台服务器的话,一旦它宕机了,服务就完全挂了。

由于我们的 WebSocket 应用维持着跟客户端的连接,在单机的时候,客户端连接、推送消息都是在一台机器上的。 这种情况下并没有什么问题,因为推送消息的时候,都可以根据 uid 来找到对应的 WebSocket 连接,从而给客户端推送消息。

而在多台机器的情况下,我们的客户端可能跟不同的服务器产生连接,这个时候一个比较关键的问题是: 如何根据 uid 找到对应的 WebSocket 连接所在的机器? 如果我们推送消息的请求到达的机器上并没有消息关联的 WebSocket 连接,那么我们的消息就无法推送给客户端了。

对于这个问题,一个可行的解决方案是,将 uid 和服务器建立起关联,比如,在用户登录的时候, 就给用户返回一个 WebSocket 服务器的地址,客户端拿到这个地址之后,跟这个服务器建立起 WebSocket 连接, 然后其他应用推送消息的时候,也根据同样的算法将推送消息的请求发送到这个 WebSocket 服务器即可。

总结

最后,再简单回顾一下本文的内容:

  • 具体来说,我们的系统中会有下面几个可能的地方会导致产生性能问题:
    • 连接数:一个连接会有两个协程,另外每一个 Client 结构体也会需要一定的缓冲区来缓冲发送给客户端的消息
    • 代码上的性能问题:如 string[]byte 之间转换带来的性能损耗
    • 互斥锁:某些地方可以使用读写锁来提高读的并发量,另外一个办法就是使用分段 map 配合互斥锁
    • 系统本身预留的扩展点中,用户自行实现的代码中可能会存在性能问题
  • 要实现高可用就得将系统部署到多台机器上,这个时候需要在 uid 和服务器之间建立起某种关联,以便推送消息的时候可以成功推送给客户端。

我在上一篇文章中,提到了目前的认证方式存在一些问题,需要替换为一种更简单的认证方式。 但是最后发现,认证这个实在是没有办法简单化,认证本身又是另外一个不小的话题了,因此关于这一点先留个坑。

本文先讨论一下另外一个也比较重要的功能:监控。

为认证预留扩展点

虽然我们暂时不去实现更加完善的认证流程,但是我们依然可以先为其预留一个扩展点, 这样在未来我们要实现认证的时候,就不需要改动太多的代码了。

同样的,我们也可以基于 DIP 原则来实现,我们可以定义一个 Authenticator 接口:

1
2
3
4
type Authenticator interface {
// Authenticate 验证请求是否合法,第一个返回值为用户 id,第二个返回值为错误
Authenticate(r *http.Request) (string, error)
}

然后我们可以在 Hub 结构体中添加一个 authenticator 字段:

1
2
3
4
type Hub struct {
// 验证器
authenticator Authenticator
}

而对于我们目前的这种基于 jwt token 的认证方式,我们可以实现一个 JwtAuthenticator

1
2
3
4
5
6
7
8
9
var _ Authenticator = &JWTAuthenticator{}

type JWTAuthenticator struct {
}

func (J *JWTAuthenticator) Authenticate(r *http.Request) (string, error) {
jwt := NewJwt(r.FormValue("token"))
return jwt.Parse()
}

接着,我们在 newHub 中初始化这个 authenticator

1
2
3
4
5
6
func newHub() *Hub {
return &Hub{
// ... 其他代码 ...
authenticator: &JWTAuthenticator{},
}
}

这样,我们就可以在 serveWs 中使用这个 authenticator 了:

1
2
3
4
5
6
7
8
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
uid, err := hub.authenticator.Authenticate(r)
if err != nil {
log.Println(fmt.Errorf("jwt parse error: %w", err))
return
}
// ... 其他代码
}

在后面我们实现了更加完善的认证流程之后,我们只需要实现一个新的 Authenticator 即可。

2023 了,应用监控怎么做

发展到今天,我们已经有了很多很好用的监控相关的东西,比如 PrometheusGrafana, 以及一些分布式链路追踪的组件,如 skywalkingjaeger 等。

但是他们各自的应用场景都不太一样,并不存在一个万能的监控工具,因此我们需要根据自己的需求来选择:

  • Prometheus:Prometheus 是一个开源的系统监控和报警工具。主要用于收集、存储和查询系统的监控数据,以便进行性能分析、故障排除和告警。
  • Grafana:Grafana 是一个开源的数据可视化和监控平台,用于创建、查询、分析和可视化时间序列数据。目前比较常见的组合就是 Prometheus + Grafana,通过 Prometheus 收集数据,然后通过 Grafana 展示数据。
  • 分布式链路追踪:常用语分布式系统的调用链路追踪,可以用于分析系统的性能瓶颈,以及分析系统的调用链路。常见的实现有 skywalkingjaeger 等。

在我们这个实例中,我们只需要实现一个简单的监控即可,因此我们可以使用 Prometheus + Grafana 的组合。

Prometheus 基本原理

但在此之前我们最好先了解一下 Prometheus 的工作原理,下面是来自 Prometheus 官网的架构图:

architecture

我们可以从两个角度来看这张图:组件、流程。

  1. 组件
  • Prometheus ServerPrometheus 服务端,主要负责数据的收集、存储、查询等。(上图中间部分)
  • AlertmanagerPrometheus 的告警组件,主要负责告警的发送。(上图右上角)
  • Prometheus web UI:可以在这个界面执行 PromQL,另外 Grafana 可以让我们以一种更直观的方式来查看指标数据(也是使用 PromQL)。(上图右下角)
  • exportersexportersPrometheus 的数据采集组件,主要负责从各个组件中采集数据,然后发送给 Prometheus Server。非常常见的如 node_exporter,也就是服务器基础指标的采集组件。除了 exporters,还有一种常见的数据采集方式是 Pushgateway,也就是将数据推送到 Pushgateway,然后由 Prometheus ServerPushgateway 中拉取数据。(也就是上图左边部分)
  1. 流程
  • 采集数据:也就是从 Pushgateway 或者 exporter 拉取一些指标数据。
  • 存储数据:Prometheus Server 会将采集到的数据存储到本地的 TSDB 中。
  • 查询数据:我们可以通过 web UI 或者 Grafana 来查看数据。

最后,我们可以在 Grafana 中看到如下图表:

grafana

通过这个图,我们就可以很直观的看到我们的系统的一些指标数据了,并且能看到这些指标随着时间的变化趋势。

Grafana 里面的图表都是一个个的 PromQL 查询出来的结果,对于常见的一些监控指标,Grafana 上可以找到很多现有的模板,直接使用即可。

Prometheus 采集的是什么数据

举一个简单的例子:对于一个运行中的系统而言,每一刻它的状态都是不太一样的,比如,可能上一秒 CPU 使用率是 10%,下一秒就变成了 100% 了, 但可能过 1 秒又降低到了 10%。当我们的系统出性能问题的时候,我们就需要去分析这些指标数据,找到问题所在。 比如排查一下出现性能问题的那个时间点,CPU 使用率是不是很高,如果是的话,那么就有可能是 CPU 导致的性能问题。

Prometheus 的作用就是帮助我们采集这些指标数据,然后存储起来,等待某天我们需要分析的时候,再去查询这些数据。 又或者监控到指标有异常的时候,可以通过 Alertmanager 来发送告警。

Prometheus 采集数据频率

Prometheus 采集数据的频率是可以配置的,我们一般配置为 1 分钟采集一次。 也就是说,每隔 1 分钟,Prometheus 才会从 exporter 拉取一次数据,然后存储起来。

应用指标数据采集

对于我们的应用而言,往往也有一些指标可以帮助我们看到应用内部的状态,比如:应用内的线程数、应用占用的内存、应用的 QPS 等等。 但是对于应用指标的监控,并没有一个统一的标准,我们需要根据自己应用的实际情况来决定采集哪些指标。

我们的消息推送系统如何做监控

应用指标

对于我们的消息推送系统而言,目前采集以下这两个重要指即可:

  1. 连接数:可以了解服务器当前负载

连接数我们可以直接通过 len(hub.clients) 来获取,非常简单。

  1. 等待推送的消息数:可以了解服务器能否及时处理消息

我们可以在 Hub 中添加一个 pending atomic.Int64 字段来记录当前等待推送的消息数,然后在 send 方法中进行更新:

1
2
3
4
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
// ... 其他代码 ...
hub.pending.Add(1)
}

同时在处理完成之后,我们也需要将其减 1,所以 writePump 也需要进行修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (c *Client) writePump() {
for {
select {
case messageLog, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// ...
c.hub.pending.Add(int64(-1 * len(c.send)))
return
}

if err := c.conn.WriteMessage(websocket.TextMessage, []byte(messageLog.Message)); err != nil {
// ...
c.hub.pending.Add(int64(-1 * len(c.send)))
return
}
}
c.hub.pending.Add(int64(-1))
}
}

我们在 writePump 中有三个地方需要对 pending 字段做减法:连接关闭、发送出错、发送成功。

exporter 以及 Grafana 配置

现在我们知道了我们有两个比较关键的指标需要采集,那到底是如何采集的呢?

具体来说,会有以下两步:

  1. 在消息推送系统中添加一个 /metrics 接口

这个接口的作用就是将我们的指标数据暴露出来,以便 Prometheus 采集。 它返回的就是请求时的连接数和等待推送的消息数,返回的格式也有一定要求,但也不复杂,具体来说就是:

  • 一行一个指标
  • 可以返回多个指标,多行即可
  • 每个指标前一行指定其类型(TYPE
  • 每行的格式为:<指标名称>{<标签名称>=<标签值>, ...} <指标值>

下面是一个简单的例子:

1
2
3
4
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="GET", endpoint="/api"} 100
http_requests_total{method="POST", endpoint="/api"} 50

在这个示例中:

  • http_requests_total 是指标名称
  • {method="GET", endpoint="/api"} 是标签集合,用于唯一标识两个不同的时间序列。
  • 10050 是样本值,表示在特定时间点上的 HTTP 请求总数。

最终,我们得到了一个如下的 /metrics 接口:

1
2
3
4
5
6
func metrics(hub *Hub, w http.ResponseWriter, r *http.Request) {
var pending = hub.pending.Load()
var connections = len(hub.clients)
w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

不要忘记了在 main 中加上一个入口:

1
2
3
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
metrics(hub, w, r)
})

最终,这个接口会返回如下的数据:

1
2
3
4
5
6
# HELP connections 连接数
# TYPE connections gauge
connections 0
# HELP pending 等待发送的消息数量
# TYPE pending gauge
pending 0
  1. Prometheus 中配置 exporter

我们需要在 Prometheus 配置文件中加上以下配置:

1
2
3
4
5
scrape_configs:
# 拉取我们的应用指标
- job_name: 'websocket'
static_configs:
- targets: ['192.168.2.107:8181']

注意:这里不需要在后面加上 /metrics,因为 Prometheus 默认就是去拉取 /metrics 接口的。

web UI

然后我们就可以在 Prometheusweb UI 中看到我们的指标数据了。

  1. Grafana 中配置图表

最后,我们可以在 Grafana 中配置一个图表,来展示我们的指标数据:

Grafana

这样,我们就可以看到一个等待发送的消息数量以及连接数的变化了。

总结

最后,再来简单回顾一下本文所讲内容,主要包括以下几个方面:

  • 认证方式是另外一个比较复杂的话题,但是我们依然可以为其预留出一个扩展点,先实现其他功能后再来完善。
  • 目前市面上有很多监控相关的组件,本文使用了 Prometheus 作为例子来演示如何在项目中采集应用的指标数据,以及如何通过 Grafana 来展示这些指标的变化。
  • Prometheus 中包含了 `Prometheus Serverexporters 等组件,其中 Server 是实际存储数据的地方,而 exporters 是用来采集指标数据的程序。
  • Prometheus 采集到的数据,我们可以通过 Grafana 来进行可视化展示,更加的直观。
  • 应用中,也可以暴露一个 /metrics 端口来返回应用当前的一些状态,只要遵循 Prometheus 的规范即可。
0%