本文旨在给大家一点关于如何写单元测试的指南,如果能帮助到大家的话那最好不过了。

前言

写代码也有几年了,可能很多人都只是知道有单元测试这个东西,但是自己从来没有写过单元测试。 单元测试好像从来都只是一个可选项,而不是必选项,因为就算没有单元测试,每个公司起码也还有专门的测试人员, 我们写好代码,然后放到测试环境交给测试人员去验证即可。这样看来好像没有单元测试也可以。

但是在走过不少弯路之后发现,即使我们没有办法做到 100% 的单元测试覆盖率,仅仅对一些复杂的功能写上单元测试,也还是可以节省我们大量的时间。 主要原因是如下:

  • 在写单元测试的过程中,就可以发现自己写的代码存在的一些 bug,在部署到测试环境之前,我们已经验证过一遍了。到达测试人员那一环节的 bug 更少了。
  • 存在单元测试的时候,在修复 bug 或者后续增加或者修改功能的时候,就可以在写完代码之后立即验证是否对旧的模块有影响。
  • 单元测试驱动可以驱动我们去写出更好的代码,因为你把代码写得随意一点就会发现这怎么测啊,我自己都看不懂自己写的代码。
  • 使用单元测试,可以让我们的验证更省时间。我们不必把应用运行起来,然后利用现有系统去造数据什么的来验证我们修改的代码。我们只需要把跟当前无关的代码 mock 掉就可以只验证当前在写的代码了。

什么是单元测试

可能也有一部分人会写测试,但是可能写得并不好,比如测试的粒度太大,比如直接针对 http 接口写测试, 但是我们也知道,一个接口背后包含的逻辑可能非常多,这样一来,我们写的测试包含的不确定性也会非常大, 因为这个大接口背后任何一个逻辑的修改都有可能导致我们的测试不通过,这样的 “单元测试” 无疑是非常脆弱的。 如下图这样,RPC 服务端、数据库服务器、文件系统、HTTP 服务端的异常都会导致我们的这种测试失败。 如果我们现在也在写这样的测试,那我们就得好好看看接下来的内容了。

本质上来说,这种测试不是单元测试,而是一种集成测试,单元测试是不包含跟其他组件的交互的,而我们的 http 接口可能会调用数据库,对数据库有强依赖。 这种依赖性也是测试脆弱性的一个来源,在良好的单元测试中,所有依赖都是被 mock(模拟) 掉的,也就是说, 我们的代码中,依然会有数据库访问的代码,但是在运行测试的时候,并不会产生实际的数据库访问操作。 在一个复杂的系统中,可能还会包含 rpc 调用、http 调用等,而这些强依赖性的东西我们都是需要在单元测试中 mock 掉的。

要认识单元测试,首先要明白什么是 “单元”。所谓 “单元” 指的是代码调用的最小单位,实际上指的就是一个功能块(Function)或者方法(Method)。 单元测试指的就是对这些代码调用单元的测试。单元测试是一种白盒测试,就是必须要对单元的代码细节很清楚才能做的测试。 所以说,如果我们代码没写好,测试也没法写,写单元测试可以驱使我们写出更好的代码。

单元测试的编写和执行都是由软件工程师来做的。相对于单元测试,还有集成测试。 集成测试基本都是都是黑盒测试,主要由测试人员根据软件的功能手册来测试,需要有专门的测试环境配合。

单元测试的价值

单从测试的角度来说,单元测试的成本是最低的,速度最快的。 因为单元测试没有任何对外部的依赖,写完直接就可以执行,我们不需要为单元测试准备一整套环境,比如先把服务器各种组件安装好,运行起来,然后把应用启动。

在我们写完代码的时候,点一下运行测试,马上就可以知道我们的代码是否有问题。 因为单元测试不需要依赖这些外部的东西。所以就算我们连服务器都还没准备好,我们也可以对我们的代码进行单元测试来验证代码的正确性。

除此以外,对于软件工程师来说,如果写代码时对自己的代码没有办法快速的验证,也就没有一个反馈,往往会有一种强烈的不安全感。 写的代码越多,不安全感累积的会越多,最后会发觉自己对自己所写的代码完全没有把握。 即使是快速的迭代方式,最少也要一周才能得到测试的反馈。并且很有可能测试的反馈结果会导致自己一周的代码都白写了,全部要推翻重来。 所以测试人员在测试的时候,软件工程师非常焦虑。如果迭代时间更长的话,造成的心理压力会更大。 测试在进行的时候,软件工程师往往会疲于奔命地去修复问题,也容易和测试团队发生冲突,从而产生沟通问题。

当然,这个问题会在持续一段时间后会好转,因为 bug 总会随着时间的推移被一个个修复。 然后我们就可以在一个更加稳定的系统上进行一些新功能的开发。 但是依然无法避免,新开发的功能可能会在某些非常隐秘的地方破坏了旧的逻辑,然后在一段时间后才能发现。 这可能不是我们想看到的结果。

另外,单元测试一旦写好可以长期使用,特别是在回归的时候,可以帮助节省大量的测试时间, 我们可以很容易知道,新的功能、或者对旧代码修改有没有对原有功能有破坏,单元测试可以帮忙发现很多隐藏的问题。

总的来说,单元测试可以给我们带来如下价值:

  • 成本更低,验证速度更快。(不依赖任何实际环境)
  • 减少回归测试的时间。(单元测试可以确保旧的功能没有受到影响)
  • 驱动我们写出更好的代码,设计良好的代码才好写单元测试。
  • 单元测试本质上也是一种文档,它描述了我们写的代码背后的意图。
  • 使得后续重构更安全。(单元测试可以验证重构是否有 bug)
  • 缩短反馈周期、降低缺陷修复成本。(在开发阶段就可以得到反馈,这个时候修复成本是最低的)
  • 保证质量的前提下提升软件交付的速度。(更少的 bug,更快的迭代速度)

单元测试的特征

  • 快速:应该花非常少的时间来运行单元测试。

如果我们有 pull 过一些优秀的开源项目,我们可以运行一下里面的单元测试,我们可能会发现在几秒内就完成了全部代码的单元测试。

  • 独立:单元测试是独立的,可以单独运行。不依赖于其他测试。

独立的一个好处就是可以单独验证某一个逻辑是否正确,如果需要依赖其他测试的话,说明我们的代码还是存在一些设计上的缺陷。 因为这在某种程度上表面,我们的不同逻辑之间存在着强依赖。

  • 可重复:运行单元测试的结果应该保持一致(幂等)

如果我们每次运行的结果都不一样,那我们也无法对程序运行结果进行断言,我们就无法判断运行的结果是否正确。

  • 自检查:测试应该能够在没有任何人工交互的情况下,自动检测测试是否通过。

比如我们不能说跑一下测试,然后去看看数据库有没有写入成功、文件有没有写入成功。 因为这种东西没什么好测的,数据库只要能正确运行那肯定是可以写入的,如果某些异常情况下数据库写入不了,那也不是我们的代码的 bug, 因此我们会 mock 掉数据库访问。而文件读写、RPC 调用之类也是同样的道理。

单元测试测什么

我们上面说了,单元测试是对一个功能块(Function)或者方法(Method)的测试。但不是所有的 “单元” 都需要单元测试的。 既然要做单元测试,我们就要知道要测什么内容。比如下面的代码,需要测试吗?

1
2
3
4
5
6
7
public static Response get(String url) throws IOException {
okhttp3.Request request = new okhttp3.Request.Builder()
.url(url)
.build();

return client.newCall(request).execute();
}

这是一段很常见的 http 调用代码,里面只是根据传递的 url 调用 okhttp 库发起了一个 GET 请求。 假如要单元测试的话,究竟是测试什么?测试那个 url 背后的服务器是否正常运行?测试我本地的网络是否正常?

实际上,这类对外部系统的依赖是不需要测试的,只要能够编译通过,操作系统会保证它的正常执行。 如果它不能正常执行,那也不是我们代码的问题,可能是 url 所在服务器宕机了,或者本地的网络异常了。 但是这跟我们的代码能否正确处理逻辑一点关系都没有。

我们写的业务逻辑不可能说在外部服务器宕机的时候就处理错误了,比如我们的代码里面计算了 1+1,我们断言它等于 2, 然后我们发起了一个 HTTP 请求,但是这个 HTTP 请求异常了,这个时候我也不能说 1+1 不等于 2。 因为我们的这个 1+1=2 的逻辑跟外部的系统没关系。

单元测试测的是我们写的业务逻辑代码。所有跟外部系统的交互都是不需要进行测试的。

如何写单元测试

明确了我们要测试的内容,接下来就得学习一下如何写单元测试了:通过提供预期的输入和预期的结果,与单元的实际运行结果进行比对, 就可以知道单元的工作是否和预期的一致。

所以,写单元测试有三个步骤:

  • 构建输入参数,并预测该输入所产生的输出。
  • 调用要测试的目标方法,获取输出。
  • 检测目标方法的输出是否和预期的输出一致(Assert 断言)。

对同一个目标方法,通过构建各种不同的输入,重复上述步骤,检测各种正常与边界状况和预期是否相符,确保把目标方法的各种可能性都覆盖。

下面是一个简单的例子(PHP):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 单元测试的目标方法
function add(int $a, int $b): int
{
return $a + $b;
}

// 单元测试
// 测试 add 方法
public function testAdd()
{
// 构建输入
$a = 1;
$b = 1;

// 调用目标方法
$sum = $this->add($a, $b);

// 比对输出与期望的值是否一致。
// 如果不一致的话,单元测试不通过,说明我们的目标方法有错误或者我们的期望值有错误。
$this->assertEquals(2, $sum);
}

我们发现,单元测试写起来好像也没那么难是吧。 当然,在实际工作中的需求大多比这个复杂多了,但是单元测试的步骤其实就上面提到的三个:构建输入、调用被测方法、验证输出。

使用 Mock

单元测试其实并不复杂,复杂的其实是我们的代码。 如果想更好地写好单元测试,我们还必须得了解一下单元测试中的 mock。

mock 是单元测试中帮助我们模拟类方法的一种技术。 我们知道了,单元测试不应该对数据库这些外部组件有依赖,那我们该如何实现才能让单元测试没有外部依赖呢? 答案就是 mock,当我们的代码需要依赖某一个类的时候,我们可以使用 mock 库来生成一个模拟的对象, 在我们的代码的代码需要调用这个对象的某些方法的时候,实际上并不会产生实际的调用。 这么说有点抽象,下面是一个非常典型的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Adder
{
public function add($a, $b)
{
return $a + $b;
}
}

class Calculator
{
private $adder;

/**
* @param Adder $adder 代表一个对外部的依赖
*/
public function __construct(Adder $adder)
{
$this->adder = $adder;
}

public function add($a, $b)
{
// 这里只使用了外部依赖,实际中可能包含非常多的逻辑
return $this->adder->add($a, $b);
}
}

// 单元测试
public function testCalculator()
{
// 创建一个模拟的 Adder 对象
$adder = Mockery::mock(Adder::class)->makePartial();
// shouldReceive 表明这个 mock 对象的 add 方法会被调用
// once 表明这个方法只会被调用一次(没有 once 调用表示可以被调用任意次数)
// with 如果调用 mock 对象的时候传递了 1 和 2 两个参数,就会返回 andReturn 中的参数
$adder->shouldReceive('add')->once()->with(1, 2)->andReturn(3);

$c = new Calculator($adder);
$this->assertEquals(3, $c->add(1, 2));

$adder = Mockery::mock(Adder::class)->makePartial();
// 没有指定 with,传递任意参数都会返回 3
$adder->shouldReceive('add')->andReturn(3);
$c = new Calculator($adder);
$this->assertEquals(3, $c->add(2, 3));
}

在所有常见的编程语言中,都会有一个比较成熟的 mock 库,比如:

  • PHP 中的 Mockery(上面的例子用的就是 Mockery)
  • Java 中的 Mockito
  • go 中的 testify 也提供了 mock 的功能

有了 Mock,我们就可以实现隔离掉外部依赖的这一目标。 不管是 RPC、数据库还是读写文件等操作,我们都可以使用一个模拟的对象来模拟实际的操作。 这意味着,不管外部系统怎么变化,我们的单元测试如果运行通过了,说明我们写的代码逻辑上是没有问题的。这样我们的单元测试才更加健壮。

在单元测试的时候,我们通常会将外部依赖以 mock 的形式注入到我们的代码中。 这一点各种语言实现上会有比较大的差异,有时候还跟使用的框架相关:

  • PHP 的 Laravel 可以通过 mock 一个对象,然后绑定到容器中,然后通过 app() 来使用框架提供的依赖注入功能,又或者自己 mock 之后直接 new 一个实例来进行测试。
  • Java 的 Spring Boot 的依赖注入更加的方面,直接通过在类字段上加上 @Mock/@InjectMocks 注解即可实现注入了。

有很多的 mock ?

看完上面的讲述,我们可能会兴致勃勃地想去写单元测试。 在我们开始写单元测试之后,可能会感到非常沮丧,一杯茶一根烟一个测试写一天,我们会发现怎么要 mock 这么多东西。 这个时候,我们可能会开始思考,这种 mock 的做法到底对不对,为什么写起来这么费劲呢?

出现这种情况,往往反映的是我们代码背后设计上存在的问题,如果一个类需要依赖很多其他东西,说明这个类本身太复杂了。 这个时候怎么办?那当然是能跑就行!代码跟人有一个能跑就行。

对于遗留系统的代码我们可能无能为力,但是对于我们新增的代码,我们依然有机会去改进, 在一边写新代码,一边写单元测试的过程中,我们可以去思考怎样写出来的代码是可以写单元测试的。 我们可以去看看关于软件设计方面的一些东西,比如郑晔的《软件设计之美》,个人感觉是比较接地气的。 持续地去编写单元测试可以促使我们写出可重用、可推广的代码,以及改进我们的软件设计。

(未完不续。)

launchd 是什么?

launchd 是 macOS 下一个服务管理工具,用于启动、停止和管理守护进程、应用程序、进程和脚本。 我们可以将 launchd 看作是 mac 下的 systemd 或者是 supervisor,如果我们想要在 mac 下启动守护进程,用 launchd 就可以了。

守护进程是在后台运行的不需要用户输入的程序。比如我们常用的 MySQL,往往是以守护进程的方式启动的。 需要注意的是:虽然本文多次提到了 守护进程,但是准确来说,launchd 可以启动的不仅仅是守护进程,还可以启动应用程序、进程和脚本。

如何写 launchd 配置文件?

launchd 的配置文件是通过一个 plist 文件来定义的(plistproperty list 的缩写),一个典型的格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<!-- Label 可以看作是守护进程的名称,key 是配置的名称,key 的下一行就是它的值,string 标签表示值的类型是字符串 -->
<key>Label</key>
<string>com.example.app</string>
<key>Program</key>
<string>/Users/Me/Scripts/cleanup.sh</string>
<key>RunAtLoad</key>
<true/>
</dict>
</plist>

说明:

  • 配置文件中,除了 dict 里面的那一部分,其他的都是固定的,不需要修改。
  • 三个字段说明:
    • Label:也就是服务的名字,可以随便取,但是不能重复。我们通过 launchctl list 来查看的时候,列出的就是这个名字。上述例子是 com.example.app
    • Program:要启动的程序的路径,需要填写绝对路径。上述例子是 /Users/Me/Scripts/cleanup.sh如果需要加参数的话,需要使用 ProgramArguments 来代替 Program,详情参考下文。
    • RunAtLoad:是否在配置被加载的时候就运行,默认是 false,如果需要在启动的时候就运行,需要设置为 true。上述例子是 true
  • 标签说明:key 就是属性的名称,紧跟着 key 的下一行就是属性的值,属性的值的类型通过其标签反映出来,比如上面的 <string> 表示包裹的是一个字符串类型,而 <true/> 表示是一个布尔类型,而且它的值是 true

mac 中很多配置都是通过 plist 来定义的,

launchd 配置文件放哪里?

macOS 中有两种类型的守护进程,一种是系统级别的(Daemons),一种是用户级别的(Agents),它们的配置文件放的位置是不一样的。 系统级别的守护进程就是不管你用户是谁,都会启动的,而用户级别的守护进程就是只有在对应的用户登录的时候才会启动的(所以会保存在用户主目录下)。

简单来说,就是如果没有用户登录进系统中,那么用户级别的守护进程(Agents)就不会启动。mac 其实跟 linux 一样,都是多用户系统,没有任何用户登录的时候,它依然是在运行的。

下面是 launchd 配置文件的路径:

  • ~/Library/LaunchAgents:用户级别的守护进程配置文件路径。这里保存特定用户的 Agents 配置。(一般情况都是放这里
  • /Library/LaunchAgents:用户级别的守护进程配置文件路径。这里保存所有用户共用的 Agents 配置。
  • /Library/LaunchDaemons:全局的 Daemons 配置。
  • /System/Library/LaunchAgents:所有登录用户共用的 Agents 配置。
  • /System/Library/LaunchDaemons:全局的 Daemons 配置。

可能大家看得有点迷,会有一种想法就是,那我的配置文件应该放哪里?这个问题的答案很简单:如果你的电脑只有你一个人用,那么你就把配置文件放在 ~/Library/LaunchAgents 下面就行了。

如何让 launchd 开机启动我们配置的守护进程?

在我们添加了配置文件之后,还有一件事需要做的就是,修改配置文件的权限,我们可以参考一下上面几个文件夹中文件的权限,然后修改成相同的就可以了。 比如 ~/Library/LaunchAgents 文件夹中的配置文件权限都是 xx:staffxx 是当前登录的用户),那么我们也把我们的配置文件权限修改成 xx:staff 就可以了。

接下来,我们只需要执行 launchctl load ~/Library/LaunchAgents/com.example.app.plist 就可以了。

如何移除守护进程呢?

如果我们想要移除守护进程,只需要执行 launchctl unload ~/Library/LaunchAgents/com.example.app.plist 就可以了。

如果要执行的命令有参数怎么配置?

在很多时候,我们都是需要加上某些参数来启动我们的命令的,要实现这种效果可以使用 ProgramArguments 配置,下面是另外一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>KeepAlive</key>
<true/>
<key>Label</key>
<string>my-privoxy</string>
<key>ProgramArguments</key>
<array>
<string>/Users/ruby/Code/proxy/privoxy</string>
<string>--no-daemon</string>
<string>privoxy.config</string>
</array>
<key>StandardErrorPath</key>
<string>/Users/ruby/Library/Logs/my-privoxy.log</string>
<key>StandardOutPath</key>
<string>/Users/ruby/Library/Logs/my-privoxy.log</string>
<key>WorkingDirectory</key>
<string>/Users/ruby/Code/proxy/</string>
</dict>
</plist>

这个例子中,我们添加了更多的元素进去,比如日志、工作目录等。

说明:

  • ProgramArguments 属性的作用是指定要执行的命令以及其参数,它的值是数组类型。
  • StandardErrorPath 配置了错误输出的日志路径。
  • StandardOutPath 配置了标准输出的日志路径。
  • WorkingDirectory 设置了我们程序运行时候所在的工作目录。

launchd 配置项说明

Label - 指定名字

1
2
<key>Label</key>
<string>com.example.app</string>

Program、ProgramArguments - 指定要执行的程序

这两个二选一,如果不需要指定参数,用 Program。如果需要指定参数,那么使用 ProgramArguments

1
2
<key>Program</key>
<string>/path/to/program</string>
1
2
3
4
5
6
7
8
<key>ProgramArguments</key>
<array>
<string>/usr/bin/rsync</string>
<string>--archive</string>
<string>--compress-level=9</string>
<string>/Volumes/Macintosh HD</string>
<string>/Volumes/Backup</string>
</array>

EnvironmentVariables - 环境变量

我们可以为 Program 设置环境变量,比如下面这个例子:

1
2
3
4
5
<key>EnvironmentVariables</key>
<dict>
<key>PATH</key>
<string>/bin:/usr/bin:/usr/local/bin</string>
</dict>

这样我们就可以在程序运行的时候读取到这些环境变量。

StandardInPath、StandardOutPath、StandardErrorPath - 重定向输入输出

1
2
3
4
5
6
<key>StandardInPath</key>
<string>/tmp/test.stdin</string>
<key>StandardOutPath</key>
<string>/tmp/test.stdout</string>
<key>StandardErrorPath</key>
<string>/tmp/test.stderr</string>

具体含义:

  • StandardInPath 标准输入的路径。
  • StandardOutPath 标准输出的路径。
  • StandardErrorPath 标准错误输出的路径。

大多时候我们可能不需要,但是如果我们的服务跑不起来,加上 StandardErrorPathStandardOutPath 我们就可以看到错误信息了。

WorkingDirectory - 指定工作目录

1
2
<key>WorkingDirectory</key>
<string>/tmp</string>

这样我们可以在程序运行的时候,直接使用相对路径。

RunAtLoad、StartInterval、StartCalendarInterval - 指定什么时候启动

这三个属性我们可以在配置中选择一个来配置:

  • RunAtLoad:如果设置为 true,那么 Program 会在系统启动的时候执行。(对于 Daemons 来说就是系统启动的时候启动,对于 Agents 来说就是用户登录的时候启动)
  • StartInterval:指定启动的时间间隔,单位是秒。也就是每隔多少秒执行一次。
  • StartCalendarInterval:指定启动的时间,可以指定每天的某个时间启动。(类似定时任务),可以指定多个时间。
1
2
<key>RunAtLoad</key>
<true/>

上面两行的作用是,在系统启动或者用户登进的时候执行命令。

1
2
<key>StartInterval</key>
<integer>3600</integer>

上面两行的作用是,每隔 3600 秒执行一次。

1
2
3
4
5
6
7
<key>StartCalendarInterval</key>
<dict>
<key>Hour</key>
<integer>3</integer>
<key>Minute</key>
<integer>0</integer>
</dict>

上面的例子中,我们指定了每天的 3 点执行一次。

还有些不太常用的选项:StartOnMountWatchPathsQueueDirectories,通过这几个配置也可以指定命令执行的时机,本文不做介绍。

KeepAlive - 指定是否保持运行

这个默认其实是 true,我们可以测试一下:kill 掉我们 launchd 启动的进程,我们会发现那个进程马上又会被启动。

这个选项中,我们可以配置一些额外的条件来让 launchd 知道什么时候需要重启进程:

  • SuccessfulExit 如果上一次退出是正常退出,那么就在进程退出的时候启动它。
1
2
3
4
5
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<true/>
</dict>
  • Crashed 如果上一次退出是异常退出,那么就在进程退出的时候启动它。
1
2
3
4
5
<key>KeepAlive</key>
<dict>
<key>Crashed</key>
<true/>
</dict>
  • NetworkState 如果网络连接断开,那么就在网络连接恢复的时候启动它。
1
2
3
4
5
<key>KeepAlive</key>
<dict>
<key>Crashed</key>
<true/>
</dict>

KeepAlive 的其他不常用选项:PathStateOtherJobEnabledAfterInitialDemand,本文不做介绍。

UserName、GroupName - 指定运行的用户和用户组

我们可以指定以什么用户、用户组来运行这个命令:

1
2
3
4
5
6
<key>UserName</key>
<string>nobody</string>
<key>GroupName</key>
<string>nobody</string>
<key>InitGroups</key>
<true/>

RootDirectory - 指定根目录

这允许我们在一个 jail root 中执行我们的命令。

1
2
<key>RootDirectory</key>
<string>/var/jail</string>

AbandonProcessGroup - 进程被终止的时候是否终止其子进程

当我们给 launchd 启动的进程发送 SIGTERM 信号的时候,这个 SIGTERM 信号也会同时被发送给它的子进程。 我们可以将 AbandonProcessGroup 设置为 true 来禁止这种行为:

1
2
<key>AbandonProcessGroup</key>
<true/>

ExitTimeOut - 优雅终止

在我们停止 launchd 启动的进程的时候,会先发送一个 SIGTERM 信号,我们的进程可以在接收到这个信号后做一些清理操作。 直到 ExitTimeOut 秒后,如果进程还没退出,那么就会发送一个 SIGKILL 信号来强行终止进程的运行:

1
2
<key>ExitTimeOut</key>
<integer>30</integer>

ThrottleInterval - 命令调用的时间间隔

可与 KeepAlive 配合使用,在进程异常退出之后,间隔 ThrottleInterval 秒后再尝试启动。

launchd 常用操作

launchctl list - 列出所有 launchd 管理的服务

1
2
➜ launchctl list | grep my-ss-local
89201 0 my-ss-local

输出的第一列是进程 id,如果是 0 说明没有在运行状态。第二列的 0 表示的是进程上一次的退出状态码,0 一般表示成功。第三列表示的是我们在 plist 配置文件中配置的 Label 的值。

加载一个 plist(服务/job)

我们可以通过下面的命令加载一个 plist

1
launchctl load ~/Library/LaunchAgents/com.example.app.plist

移除一个 plist(服务/job)

我们可以通过下面的命令来移除 launchd 配置:

1
launchctl unload ~/Library/LaunchAgents/com.example.app.plist

启动一个 job

下面的 com.example.app 是我们在 plist 中配置的 Label 的值:

1
launchctl start com.example.app

停止一个 job

下面的 com.example.app 是我们在 plist 中配置的 Label 的值:

1
launchctl stop com.example.app

一些实例

下面是个人使用中的一些配置文件,供大家参考。下面的例子涵盖了常用的一些配置,我们复制改改就可以用了。

frpc

文件 ~/Library/LaunchAgents/frp.plist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>KeepAlive</key>
<true/>
<key>Label</key>
<string>frpc</string>
<key>ProgramArguments</key>
<array>
<string>/usr/local/bin/frpc</string>
<string>-c</string>
<string>/etc/frpc.ini</string>
</array>
</dict>
</plist>

这个配置文件的作用是,以守护进程的方式来启动 /usr/local/bin/frpc -c /etc/frpc.ini 这个命令。启用方式为:

1
launchctl load ~/Library/LaunchAgents/frp.plist

我们可以通过 ps 来查看进程是否成功启动。

ss-local

文件 ~/Library/LaunchAgents/ss-local.plist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>KeepAlive</key>
<true/>
<key>Label</key>
<string>my-ss-local</string>
<key>ProgramArguments</key>
<array>
<string>/Users/ruby/Code/proxy/ss-local</string>
<string>-c</string>
<string>ss-local-config.json</string>
</array>
<key>StandardErrorPath</key>
<string>/Users/ruby/Library/Logs/my-ss-local.log</string>
<key>StandardOutPath</key>
<string>/Users/ruby/Library/Logs/my-ss-local.log</string>
<key>WorkingDirectory</key>
<string>/Users/ruby/Code/proxy/</string>
</dict>
</plist>

这个配置文件的作用是,以守护进程的方式来启动 /Users/ruby/Code/proxy/ss-local -c ss-local-config.json 这个命令, 并且将标准错误输出和标准输出都重定向到 /Users/ruby/Library/Logs/my-ss-local.log 文件中。 ss-local 进程的工作目录是 /Users/ruby/Code/proxy/

启用方式为:

1
launchctl load ~/Library/LaunchAgents/ss-local.plist

homebrew 的 service - MySQL

文件 ~/Library/LaunchAgents/homebrew.mxcl.mysql@5.7.plist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>KeepAlive</key>
<true/>
<key>Label</key>
<string>homebrew.mxcl.mysql@5.7</string>
<key>ProgramArguments</key>
<array>
<string>/usr/local/opt/mysql@5.7/bin/mysqld_safe</string>
<string>--datadir=/usr/local/var/mysql</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>WorkingDirectory</key>
<string>/usr/local/var/mysql</string>
</dict>
</plist>

我们在使用 mac 的时候最常用的 homebrew 也是通过 launchd 来管理服务的。我们可以通过 brew services list 来查看当前启动的服务。 brew 管理的服务的配置文件也会被放在 ~/Library/LaunchAgents 目录中。

上面的配置文件中,做了以下配置:

  • KeepAlive - 设置为 true,那么当进程退出的时候,launchd 会自动重启这个进程。
  • Label - 这个配置文件的唯一标识。我们除了通过 brew services list 查看到 MySQL 服务,还可以通过 launchctl list | grep mysql 来看到这个进程的状态。
  • ProgramArguments - 指定要执行的命令。这里的完整命令是 /usr/local/opt/mysql@5.7/bin/mysqld_safe --datadir=/usr/local/var/mysql
  • RunAtLoad - 设置为 true,那么当用户进入系统的时候,会自动启动这个进程。
  • WorkingDirectory - 指定 MySQL 进程的工作目录。

总结

  • mac 下可以使用 launchd 来配置一些守护进程、定时任务等。类似的工具有 systemdsupervisor 等,但是 launchd 是 mac 自带的,不用安装其他依赖就能使用。
  • launchd 配置文件类型为 plist,也就是 property list,用户级别的配置文件一般存放在 ~/Library/LaunchAgents 目录中。
  • launchd 配置的几个关键属性:
    • Label - 用来标识这个 job 的唯一标识符。
    • Program - 指定要执行的命令。(如果有参数,我们需要使用 ProgramArguments 来代替 Program
    • StandardErrorPathStandardOutPath - 指定标准错误输出和标准输出的路径。
    • WorkingDirectory - 指定命令执行的工作目录。
  • launchd 可以通过 launchctl 命令来管理:
    • launchctl list - 列出所有 launchd 管理的服务。
    • launchctl load - 加载一个 plist
    • launchctl unload - 移除一个 plist
    • launchctl start - 启动一个 job
    • launchctl stop - 停止一个 job

launchd 的使用还是挺简单的,命令也就那么几个,所以如果想在你的 mac 中启动一些守护进程的话,可以尝试一下。

gitlab 可能大家很常用,CI、CD 也应该早有耳闻,但是可能还没有去真正地了解过,这篇文章就是我对 gitlab CI、CD 的一些理解,以及踩过的一些坑,希望能帮助到大家。

什么是 CI、CD

CI(Continuous Integration)持续集成,CD(Continuous Deployment)持续部署(也包含了持续交付的意思)。

CI 指的是一种开发过程的的自动化流程,在我们提交代码的时候,一般会做以下操作:

  • lint 检查,检查代码是否符合规范
  • 自动运行测试,检查代码是否能通过测试

这个过程我们可以称之为 CI,也就是持续集成,这个过程是自动化的,也就是说我们不需要手动去执行这些操作,只需要提交代码,这些操作就会自动执行。

CD 指的是在我们 CI 流程通过之后,将代码自动发布到服务器的过程,这个过程也是自动化的。 在有了前面 CI 的一些操作之后,说明我们的代码是可以安全发布到服务器的,所以就可以进行发布的操作。

为什么要使用 CI、CD

实际上,就算没有 CI、CD 的这些花里胡哨的概念,对于一些重复的操作,我们也会尽量想办法会让它们可以自动化实现的,只不过可能效率上没有这么高,但是也是可以的。

CI、CD 相比其他方式的优势在于:

  • 一次配置,多次使用:我们需要做的所有操作都通过配置固定下来了,每次提交代码我们都可以执行相同的操作。
  • 可观测性:我们可以通过 CI、CD 的日志来查看每次操作的执行情况,而且每一次的 CI、CD 执行的日志都会保留下来,这样我们就可以很方便地查看每一次操作的执行情况。
  • 自动化:我们不需要手动去执行 CI、CD 的操作,只需要提交代码,CI、CD 就会自动执行。
  • 少量配置:一般的代码托管平台都会提供 CI、CD 的功能,我们只需要简单的配置一下就可以使用了。同时其实不同平台的 CI、CD 配置也是有很多相似之处的,所以我们只需要学习一种配置方式,就可以在不同平台上使用了。

gitlab CI、CD

在开始之前,我们可以通过下图来了解一下 CI、CD 的整体流程:

gitlab_1
  1. 在开发人员提交代码之后,会触发 gitlab 的 CI 流水线。也就是上图的 CI PIPELINE,也就是中间的部分。
  2. 在 CI 流水线中,我们可以配置多个任务。比如上图的 buildunit testintegration tests 等,也就是构建、单元测试、集成测试等。
  3. 在 CI 流水线都通过之后,会触发 CD 流水线。也就是上图的 CD PIPELINE,也就是右边的部分。
  4. 在 CD 流水线中,我们可以配置多个任务。比如上图的 stagingproduction 等,也就是部署到测试环境、部署到生产环境等。

在 CD 流程结束之后,我们就可以在服务器上看到我们的代码了。

gitlab CI、CD 中的一些基本概念

在开始之前,我们先来了解一下 gitlab CI、CD 中的一些基本概念:

  • pipeline:流水线,也就是 CI、CD 的整个流程,包含了多个 stage,每个 stage 又包含了多个 job
  • stage: 一个阶段,一个阶段中可以包含多个任务(job),这些任务会并行执行,但是下一个 stagejob 只有在上一个 stagejob 执行通过之后才会执行。
  • job:一个任务,这是 CI、CD 中最基本的概念,也是最小的执行单元。一个 stage 中可以包含多个 job,同时这些 job 会并行执行。
  • runner:执行器,也就是执行 job 的机器,runner 跟 gitlab 是分离的,runner 需要我们自己去安装,然后注册到 gitlab 上(不需要跟 gitlab 在同一个服务器上,这样有个好处就是可以很方便实现多个机器来同时处理 gitlab 的 CI、CD 的任务)。
  • tag: runnerjob 都需要指定标签,job 可以指定一个或多个标签(必须指定,否则 job 不会被执行),这样 job 就只会在指定标签的 runner 上执行。
  • cache: 缓存,可以缓存一些文件,这样下次流水线执行的时候就不需要重新下载了,可以提高执行效率。
  • artifacts: 这代表这构建过程中所产生的一些文件,比如打包好的文件,这些文件可以在下一个 stage 中使用,也可以在 pipeline 执行结束之后下载下来。
  • variables:变量,可以在 pipeline 中定义一些变量,这些变量可以在 pipeline 的所有 stagejob 中使用。
  • services:服务,可以在 pipeline 中启动一些服务,比如 mysqlredis 等,这样我们就可以在 pipeline 中使用这些服务了(常常用在测试的时候模拟一个服务)。
  • script: 脚本,可以在 job 中定义一些脚本,这些脚本会在 job 执行的时候执行。

CI、CD 的工作模型

我们以下面的配置为例子,简单说明一下 pipelinestagejob 的工作模型,以及 cacheartifacts 的作用:

ci 配置文件(也就是一个 pipeline 的所有任务):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 定义一个 pipeline 的所有阶段,一个 pipeline 可以包含多个 stage,每个 stage 又包含多个 job。
# stage 的顺序是按照数组的顺序来执行的,也就是说 stage1 会先执行,然后才会执行 stage2。
stages:
- stage1 # stage 的名称
- stage2

# 定义一个 job,一个 job 就是一个任务,也是最小的执行单元。
job1:
stage: stage1 # 指定这个 job 所属的 stage,这个 job 只会在 stage1 执行。
script: # 指定这个 job 的脚本,这个脚本会在 job 执行的时候执行。
- echo "hello world" > "test.txt"
tags: # 指定这个 job 所属的 runner 的标签,这个 job 只会在标签为 tag1 的 runner 上执行。
- tag1
# cache 可以在当前 pipeline 后续的 job 中使用,也可以在后续的 pipeline 中使用。
cache: # 指定这个 job 的缓存,这个缓存会在 job 执行结束之后保存起来,下次执行的时候会先从缓存中读取,如果没有缓存,就会重新下载。
key: $CI_COMMIT_REF_SLUG # 缓存的 key(也可以是文件名列表,那样对应的)
paths: # 缓存的路径
- node_modules/
artifacts: # 指定这个 job 的构建产物,这个构建产物会在 job 执行结束之后保存起来。可以在下一个 stage 中使用,也可以在 pipeline 执行结束之后下载下来。
paths:
- test.txt

job2:
stage: stage1
script:
- cat test.txt
tags:
- tag1
cache:
key: $CI_COMMIT_REF_SLUG
paths:
- node_modules/
# 指定这个 job 的缓存策略,只会读取缓存,不会写入缓存。默认是既读取又写入,在 job 开始的时候读取,在 job 结束的时候写入。
# 但是实际上,只有在安装依赖的时候是需要写入缓存的,其他 job 都使用 pull 即可。
policy: pull


# job3 和 job4 都属于 stage2,所以 job3 和 job4 会并行执行。
# job3 和 job4 都指定了 tag2 标签,所以 job3 和 job4 只会在标签为 tag2 的 runner 上执行。
# 同时,在 job1 中,我们指定了 test.txt 作为构建产物,所以 job3 和 job4 都可以使用 test.txt 这个文件。
job3:
stage: stage2
script:
- cat test.txt
tags:
- tag1
cache:
key: $CI_COMMIT_REF_SLUG
paths:
- node_modules/
policy: pull

job4:
stage: stage2
script:
- cat test.txt
tags:
- tag1
cache:
key: $CI_COMMIT_REF_SLUG
paths:
- node_modules/
policy: pull

上面的配置文件的 pipeline 执行过程可以用下面的图来表示:

gitlab_2

说明:

  1. 上面的图有两个 pipeline 被执行了,但是 pipeline2 没有全部画出来
  2. 其中,在 pipeline 1 中,stage1 中的 job 会先被执行,然后才会执行 stage2 中的 job
  3. stage1 中的 job1job2 是可以并行执行的,这也就是 stage 的本质上的含义,表示了一个阶段中不同的任务,比如我们做测试的时候,可以同时对不同模块做测试。
  4. job1job2 都指定了 tag1 标签,所以 job1job2 只会在标签为 tag1runner 上执行。
  5. job1 中,我们创建了一个 test.txt 文件,这个文件会作为 stage1 的构建产物,它可以在 stage2 中被使用,也就是 job3job4 都可以读取到这个文件。一种实际的场景是,前端部署的时候,build 之后会生成可以部署的静态文件,这些静态文件就会被保留到部署相关的 stage 中。需要注意的是,artifacts 只会在当前 pipeline 后续的 stage 中共享,不会在 pipeline 之间共享。
  6. 同时,在 job1 中,我们也指定了 cache,这个 cache 会在 job1 执行结束之后保存起来,不同于 artifactscache 是可以在不同的 pipeline 之间共享的。一种很常见的使用场景就是我们代码的依赖,比如 node_modules 文件夹,它可以加快后续 pipeline 的执行流程,因为避免了重复的依赖安装。

需要特别注意的是:cache 是跨流水线共享的,而 artifacts 只会在当前流水线的后续 stage 共享。

gitlab runner 和 executor

gitlab runner 在 CI/CD 中是一个非常重要的东西,因为我们写的 CI/CD 的配置就是在 runner 上运行的,如果我们想要执行 CI/CD 任务,我们必须先安装配置 gitlab-runner

其中 runner 是一台执行 CI/CD 脚本的机器(也就是安装了 gitlab-runner 的机器)。这个机器可以部署在 gitlab 服务器以外的任意一台电脑上,当然也可以跟 gitlab 在同一台服务器。

而每一个 runner 会对应一种特定的 executorexecutor 就是我们执行 CI/CD 里面 script 的环境。比如如果我们指定了 executor 类型为 docker,那么我们 CI/CD 脚本里面的 script 将会在一个独立的 docker 容器中执行。

简单来说,runner 是执行 CI/CD 脚本的机器,这个机器上有不同类型的 executor,一个 executor 代表着一个不同类型的命令行终端,最常见的是 shelldocker,当然也支持 widnows 的 powershell

我们可以通过下图来了解一下 gitlab 是怎么跟 runner 配合的:

gitlab 是通过 tags 来找到运行脚本的 runner 的,如果 jobtagsrunnertags 匹配了,就可以将那个 job 放到 runner 上处理。

gitlab_3

说明:

  • 我们在两台机器上安装了 gitlab-runner,它们的 IP 是 192.168.2.123192.168.2.234
  • test-jobtags 中包含了 api 这个 tag,而 runner1tags 也包含了 api 这个 tag,因此 test-job 会被 runner1 执行。
  • 同理,npm-install 这个 job 会被 runner3 处理。

从上图我们可以看到,其实不同的 runner 是有可能位于不同的机器上的。

其他一些在个人实践中的一些经验

gitlab 的 CI、CD 是一个很庞大的话题,同时很多内容可能比较少用,所以本文只是介绍个人在实践中用到的一些内容,其他的东西如果有需要,可以自行查阅官方文档。

指定特定分支才会执行的 job

这个算是基本操作了,我们可以通过 only 来指定特定分支才会执行的 job,也有其他方法可以实现,比如 rules,具体请参考官方文档。

1
2
3
4
5
deploy-job:
stage: deploy
# 当前的这个 job 只会在 master 分支代码更新的时候会执行
only:
- "master"

不同 job 之间的依赖

这个也是基本操作,我们可以通过 needs 来指定不同 job 之间的依赖关系,比如 job1 依赖 job2,那么 job1 就会在 job2 执行完毕之后才会执行。

1
2
3
4
job1:
stage: deploy
needs:
- job2

指定执行 job 的 runner

我们可以通过 tags 来指定 job 执行的 runner,比如我们可以指定 job 只能在 api 标签的 runner 上执行。

1
2
3
4
build-job:
stage: build
tags:
- api

如果我们没有标签为 apirunner,那么这个 job 就会一直不会被执行,所以需要确保我们配置的 tag 有对应的 runner

指定 job 的 docker image

注意:这个只在我们的 runnerexecutordocker 的时候才会生效。也就是我们的 runner 是一个 docker 容器。

有时候,我们需要执行一些特定命令,但是我们全局的 docker 镜像里面没有,可能只需要一个特定的 docker 镜像,这个时候我们可以通过 image 来指定 jobdocker 镜像。

1
2
3
4
5
6
7
8
9
deploy-job:
stage: deploy
tags:
- api
# 指定 runner 的 docker image
image: eleven26/rsync:1.3.0
script:
# 下面这个命令只在上面指定的 docker 镜像中存在
- rsync . root@example.com:/home/www/foo

为我们的集成测试指定一个 service

在我们的 CI 流程中,可能会有一些集成测试需要使用到一些服务,比如我们的 mysql,这个时候我们可以通过 services 来指定我们需要的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
test_rabbitmq:
# 这会启动一个 rabbitmq 3.8 的 docker 容器,我们的 job 就可以使用这个容器了。
# 我们的 job 可以连接到一个 rabbitmq 的服务,然后进行测试。
# 需要注意的是,这个容器只会在当前 job 执行的时候存在,执行完毕之后就会被删除。所以产生的数据不会被保留。
services:
- rabbitmq:3.8
stage: test
only:
- master
tags:
- go
script:
# 下面的测试命令会连接到上面启动的 rabbitmq 服务
- "go test -v -cover ./pkg/rabbitmq"

复用 yaml 配置片段

yaml 中,有一种机制可以让我们复用 yaml 配置片段,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 发布代码的 job
.deploy-job: &release-job
tags:
- api
image: eleven26/rsync:1.3.0
script:
- rsync . root@example.com:/home/www/foo

deploy-release:
<<: *release-job
stage: deploy
only:
- "release"

deploy-master:
<<: *release-job
stage: deploy
only:
- "master"

上面的代码中,我们定义了一个 release-job 的配置片段,然后在 deploy-releasedeploy-master 中,我们都引用了这个配置片段,这样我们就可以复用这个配置片段了。 等同于下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 发布代码的 job
.deploy-job: &release-job
tags:
- api
image: eleven26/rsync:1.3.0
script:
- rsync . root@example.com:/home/www/foo

deploy-release:
tags:
- api
image: eleven26/rsync:1.3.0
script:
- rsync . root@example.com:/home/www/foo
stage: deploy
only:
- "release"

deploy-master:
tags:
- api
image: eleven26/rsync:1.3.0
script:
- rsync . root@example.com:/home/www/foo
stage: deploy
only:
- "master"

yaml 的术语中,这一种机制叫做 anchor

cache vs artifacts

初次使用的人,可能会对这个东西有点迷惑,因为它们好像都是缓存,但是实际上,它们的用途是不一样的。

  • cache 是用来缓存依赖的,比如 node_modules 文件夹,它可以加快后续 pipeline 的执行流程,因为避免了重复的依赖安装。
  • artifacts 是用来缓存构建产物的,比如 build 之后生成的静态文件,它可以在后续的 stage 中使用。表示的是单个 pipeline 中的不同 stage 之间的共享

指定 artifacts 的过期时间

我们可以通过 expire_in 来指定 artifacts 的过期时间,比如:

1
2
3
4
5
6
7
8
9
10
11
job1:
stage: build
only:
- "release"
image: eleven26/apidoc:1.0.0
tags:
- api
artifacts:
paths:
- public
expire_in: 1 hour

因为我们的 artifacts 有时候只是生成一些需要部署到服务器的东西,然后在下一个 stage 使用,所以是不需要长期保留的。所以我们可以通过 expire_in 来指定一个比较短的 artifacts 的过期时间。

cache 只 pull 不 push

gitlab CI 的 cache 有一个 policy 属性,它的值默认是 pull-push,也就是在 job 开始执行的时候会拉取缓存,在 job 执行结束的时候会将缓存指定文件夹的内容上传到 gitlab 中。

但是在实际使用中,我们其实只需要在安装依赖的时候上传这些缓存,其他时候都只是读取缓存的。所以我们在安装依赖的 job 中使用默认的 policy,而在后续的 job 中,我们可以通过 policy: pull 来指定只拉取缓存,不上传缓存。

1
2
3
4
5
6
7
8
9
10
11
12
job:
tags:
- api
image: eleven26/rsync:1.3.0
cache:
key:
files:
- composer.json
- composer.lock
paths:
- "vendor/"
policy: pull # 只拉取 vendor,在 job 执行完毕之后不上传 vendor

cache 的 key 使用文件

这一个特性是非常有用的,在现代软件工程的实践中,往往通过 *.lock 文件来记录我们使用的额依赖的具体版本,以保证在不同环境中使用的时候保持一致的行为。

所以,相应的,我们的缓存也可以在 *.lock 这类文件发生变化的时候,重新生成缓存。上面的例子就使用了这种机制。

script 中使用多行命令

script 中,我们可以使用多行命令,比如:

1
2
3
4
5
6
7
job:
script:
# 我们可以通过下面这种方式来写多行的 shell 命令,也就是以一个竖线开始,然后换行
- |
if [ "$release_host" != "" ]; then
host=$release_host
fi

CD - 如何同步代码到服务器

如果我们的项目需要部署到服务器上,那么我们还需要做一些额外的操作,比如同步代码到服务器上。 如果我们的 gitlab 是通过容器执行的(也就是说 gitlab 是通过 docker 启动的),或者我们的 runner 的 executor 是 docker,那么有一种比较常见的方法是通过 ssh 私钥来进行部署。

我们可以通过以下流程来实现:

  1. 新建一对 ssh key,比如 id_rsaid_rsa.pub
  2. id_rsa.pub 的内容添加到服务器的 authorized_keys 文件中。
  3. id_rsa 上传到 gitlab 中(在项目的 CI/CD 配置中,配置一个变量,变量名为 PRIVATE_KEY,内容为 id_rsa 的内容,类型为 file)。
  4. 在我们的 ci 配置文件中,添加如下配置即可:
1
2
3
4
5
6
7
8
9
10
11
12
before_script:
- chmod 600 $PRIVATE_KEY

deploy:
stage: deploy
image: eleven26/rsync:1.3.0
script:
# $user 是 ssh 的用户
# $host 是 ssh 的主机
# $port 是 ssh 的端口
# $PRIVATE_KEY 是我们在 gitlab 中配置的私钥
- rsync -az -e "ssh -o StrictHostKeyChecking=no -p $port -i $PRIVATE_KEY" --delete --exclude='.git' . $user@$host:/home/www

这里的 rsync 命令中,我们使用了 -o StrictHostKeyChecking=no 参数,这是为了避免每次都需要手动输入 yes 来确认服务器的指纹。

安全最佳实践:

  • 为每一个 project 配置 ssh key 变量,如果是全局变量的话,其他 project 可以在未授权的情况下,访问到这个私钥,这是非常危险的。
  • 使用单独的仓库来保存 ci 配置文件,防止其他人未经授权就修改 ci 配置文件,这也是非常危险的。

必须严格遵循以上两步,否则会造成严重的安全问题。因为拿到了私钥,就等于拿到了我们的服务器密码。

ERROR: Job failed: exit code xx 解决方案

我们在使用的时候可能会经常遇到这种错误(在 job 执行的输出里面),如果运气好,在输出里面也有一些额外的错误信息, 这种是最好处理的,它已经告诉你错误原因了。还有一种非常坑爹的情况是:job 失败了,只有一个非 0 的退出状态码,但是没有任何的报错信息,这种情况就比较难处理(更加坑爹的是,偶尔出现这种失败)。

job script 的执行流程

如果我们理解了 gitab CI/CD 中 job 的执行原理,那么这个问题其实就很好解决了,jobscript 执行流程如下:

  1. 拿到 script 中第一条命令,然后执行。
  2. 检查上一步的退出状态码,如果状态码为 0,继续执行下一条命令。否则,job 直接失败,然后显示信息 ERROR: Job failed: exit code <xx>,最后的 <xx> 就是上一条命令的非 0 的那个退出状态码。
  3. 按以上两个步骤来一条条执行 script 中的命令。

如果使用的是 bash shell,我们可以通过 echo $? 来获取上一条命令的退出状态码。状态码方面的约定都是:0 表示成功,非 0 表示不成功。

解决方法

知道了 job 的执行原理之后,问题就很好解决了,我们只需要在 job 执行日志中找到最后那一条命令即可:

  1. 先看这个命令是否有执行失败相关的错误输出信息,如果有,那么解决对应错误即可。
  2. 如果这个执行失败的命令,一点输出都没有。那么我们可以深入了解一下这个命令的退出状态码什么时候等于我们 job 的状态码,然后再对症下药。

一个实例

下面是一个 job 日志的最后几行,但是不包含具体的错误信息:

1
2
3
$ if (( $need_restart_queue == 1 )); then ssh $user@$host "supervisorctl restart xx"; fi
Cleaning up project directory and file based variables
ERROR: Job failed: exit code 1, no message

第一行是执行的命令,这个命令中,通过 ssh 执行了一条远程命令,然后退出。第二行是 job 失败后做清理操作输出的日志,最后一行输出 job 失败的错误码。

就是这个错误,困扰了我几天,因为它是偶尔失败的。

在这个例子中,比上面说到的要复杂一点,这里通过了 ssh 来执行远程命令,如果通过 ssh 执行远程命令,那么 ssh 命令的退出状态码就是执行的那个远程命令的退出状态码。 明确了这一点,我们就可以把问题定位在那个远程命令 supervisorctl restart xx 上,也就是说我们的失败是因为这个命令导致的。

后面排查发现,supervisorctl 命令本身就有一定几率失败,针对这种情况,有两种解决方案:

  1. 重试,可以给 job 指定重试次数,可以是 0~2,也就是说 gitlab 的 job 最多可以重试 2 次。
  2. 忽略这个错误,使用其他解决方案。(我们可以在 ssh 命令后面加上 || true 来忽略,加上这个,命令退出状态码一定是 0 了)

我是采取了后面那一种解决方法,因为服务器上还有一个定时任务来检测对应的进程,如果进程不存在,则会使用 supervisorctl start xx 来启动对应的服务。

总结

最后,总结一下本文中一些比较关键的内容:

  • gitlab 中的一些基本概念:
    • pipeline:代表了一次 CI 的执行过程,它包含了多个 stage
    • stage:代表了一组 job 的集合,stage 会按照顺序执行。
    • job:代表了一个具体的任务,比如 buildtestdeploy 等。
  • 一个 stage 中的多个 job 是可以并行执行的。但是下一个 stagejob 必须要等到上一个 stage 的所有 job 都执行完毕之后才会执行。
  • cacheartifacts 的区别:
    • cache 是用来缓存依赖的,比如 node_modules 文件夹,它可以加快后续 pipeline 的执行流程,因为避免了重复的依赖安装。
    • artifacts 是用来缓存构建产物的,比如 build 之后生成的静态文件,它可以在后续的 stage 中使用。表示的是单个 pipeline 中的不同 stage 之间的共享
  • cache 在安装依赖的 job 中才需要使用默认的 policy,也就是 pull-push,在其他不需要安装依赖的 job 中使用 pull 就可以了,不需要上传缓存。
  • cachekey 可以指定多个文件,这样在指定的文件变动的时候,缓存会失效,这往往用在依赖相关的文件中。
  • 可以使用 services 关键字来指定需要启动的服务,比如 mysqlredis 等,在 job 中可以连接到这些 services,从而方便进行测试。
  • 可以使用 yamlanchor 机制来复用一些配置片段,可以少写很多重复的配置。
  • 一个 job 必须运行在某个 runner 上,jobrunner 的关联是通过 tag 来指定的。

在上一篇文章《深入理解 go Mutex》中, 我们已经对 go Mutex 的实现原理有了一个大致的了解,也知道了 Mutex 可以实现并发读写的安全。 今天,我们再来看看另外一种锁,RWMutex,有时候,其实我们读数据的频率要远远高于写数据的频率, 而且不同协程应该可以同时读取的,这个时候,RWMutex 就派上用场了。

RWMutex 的实现原理和 Mutex 类似,只是在 Mutex 的基础上,区分了读锁和写锁:

  • 读锁:只要没有写锁,就可以获取读锁,多个协程可以同时获取读锁(可以并行读)。
  • 写锁:只能有一个协程获取写锁,其他协程想获取读锁或写锁都只能等待。

下面就让我们来深入了解一下 RWMutex 的基本使用和实现原理等内容。

RWMutex 的整体模型

正如 RWMutex 的命名那样,它是区分了读锁和写锁的锁,所以我们可以从读和写两个方面来看 RWMutex 的模型。

下文中的 reader 指的是进行读操作的 goroutine,writer 指的是进行写操作的 goroutine。

读操作模型

我们可以用下图来表示 RWMutex 的读操作模型:

rwmutex_1

上图使用了 w.Lock,是因为 RWMutex 的实现中,写锁是使用 Mutex 来实现的。

说明:

  • 读操作的时候可以同时有多个 goroutine 持有 RLock,然后进入临界区。(也就是可以并行读),上图的 G1G2G3 就是同时持有 RLock 的几个 goroutine。
  • 在读操作的时候,如果有 goroutine 持有 RLock,那么其他 goroutine (不管是读还是写)就只能等待,直到所有持有 RLock 的 goroutine 释放锁。
  • 也就是上图的 G4 需要等待 G1G2G3 释放锁之后才能进入临界区。
  • 最后,因为 G5G6 这两个协程获取锁的时机比 G4 晚,所以它们会在 G4 释放锁之后才能进入临界区。

写操作模型

我们可以用下图来表示 RWMutex 的写操作模型:

rwmutex_2

说明:

  • 写操作的时候只能有一个 goroutine 持有 Lock,然后进入临界区,释放写锁之前,所有其他的 goroutine 都只能等待。
  • 上图的 G1~G5 表示的是按时间顺序先后获取锁的几个 goroutine。
  • 上面几个 goroutine 获取锁的过程是:
    • G1 获取写锁,进入临界区。然后 G2G3G4G5 都在等待。
    • G1 释放写锁之后,G2G3 可以同时获取读锁,进入临界区。然后 G3G4G5 都在等待。
    • G2G3 可以同时获取读锁,进入临界区。然后 G4G5 都在等待。
    • G2G3 释放读锁之后,G4 获取写锁,进入临界区。然后 G5 在等待。
    • 最后,G4 释放写锁,G5 获取读锁,进入临界区。

基本用法

RWMutex 中包含了以下的方法:

  • Lock:获取写锁,如果有其他 goroutine 持有读锁或写锁,那么就会阻塞等待。
  • Unlock:释放写锁。
  • RLock:获取读锁,如果有其他 goroutine 持有写锁,那么就会阻塞等待。
  • RUnlock:释放读锁。

其他不常用的方法:

  • RLocker:返回一个读锁,该锁包含了 RLockRUnlock 方法,可以用来获取读锁和释放读锁。
  • TryLock: 尝试获取写锁,如果获取成功,返回 true,否则返回 false。不会阻塞等待。
  • TryRLock: 尝试获取读锁,如果获取成功,返回 true,否则返回 false。不会阻塞等待。

一个简单的例子

我们可以通过下面的例子来看一下 RWMutex 的基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package mutex

import (
"sync"
"testing"
)

var config map[string]string
var mu sync.RWMutex

func TestRWMutex(t *testing.T) {
config = make(map[string]string)

// 启动 10 个 goroutine 来写
var wg1 sync.WaitGroup
wg1.Add(10)
for i := 0; i < 10; i++ {
go func() {
set("foo", "bar")
wg1.Done()
}()
}

// 启动 100 个 goroutine 来读
var wg2 sync.WaitGroup
wg2.Add(100)
for i := 0; i < 100; i++ {
go func() {
get("foo")
wg2.Done()
}()
}

wg1.Wait()
wg2.Wait()
}

// 获取配置
func get(key string) string {
// 获取读锁,可以多个 goroutine 并发读取
mu.RLock()
defer mu.RUnlock()

if v, ok := config[key]; ok {
return v
}

return ""
}

// 设置配置
func set(key, val string) {
// 获取写锁
mu.Lock()
defer mu.Unlock()

config[key] = val
}

上面的例子中,我们启动了 10 个 goroutine 来写配置,启动了 100 个 goroutine 来读配置。 这跟我们现实开发中的场景是一样的,很多时候其实是读多写少的。 如果我们在读的时候也使用互斥锁,那么就会导致读的性能非常差,因为读操作一般都不会有副作用的,但是如果使用互斥锁,那么就只能一个一个的读了。

而如果我们使用 RWMutex,那么就可以同时有多个 goroutine 来读取配置,这样就可以大大提高读的性能。 因为我们进行读操作的时候,可以多个 goroutine 并发读取,这样就可以大大提高读的性能。

RWMutex 使用的注意事项

《深入理解 go Mutex》中,我们已经讲过了 Mutex 的使用注意事项, 其实 RWMutex 的使用注意事项也是差不多的:

  • 不要忘记释放锁,不管是读锁还是写锁。
  • Lock 之后,没有释放锁之前,不能再次使用 Lock
  • Unlock 之前,必须已经调用了 Lock,否则会 panic
  • 在第一次使用 RWMutex 之后,不能复制,因为这样一来 RWMutex 的状态也会被复制。这个可以使用 go vet 来检查。

源码剖析

RWMutex 的一些实现原理跟 Mutex 是一样的,比如阻塞的时候使用信号量等,在 Mutex 那一篇中已经有讲解了,这里不再赘述。 这里就 RWMutex 的实现原理进行一些简单的剖析。

RWMutex 结构体

RWMutex 的结构体定义如下:

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 互斥锁,用于保护读写锁的状态
writerSem uint32 // writer 信号量
readerSem uint32 // reader 信号量
readerCount atomic.Int32 // 所有 reader 数量
readerWait atomic.Int32 // writer 等待完成的 reader 数量
}

各字段含义:

  • w:互斥锁,用于保护读写锁的状态。RWMutex 的写锁是互斥锁,所以直接使用 Mutex 就可以了。
  • writerSem:writer 信号量,用于实现写锁的阻塞等待。
  • readerSem:reader 信号量,用于实现读锁的阻塞等待。
  • readerCount:所有 reader 数量(包括已经获取读锁的和正在等待获取读锁的 reader)。
  • readerWait:writer 等待完成的 reader 数量(也就是获取写锁的时刻,已经获取到读锁的 reader 数量)。

因为要区分读锁和写锁,所以在 RWMutex 中,我们需要两个信号量,一个用于实现写锁的阻塞等待,一个用于实现读锁的阻塞等待。 我们需要特别注意的是 readerCountreaderWait 这两个字段,我们可能会比较好奇,为什么有了 readerCount 这个字段, 还需要 readerWait 这个字段呢?

这是因为,我们在尝试获取写锁的时候,可能会有多个 reader 正在使用读锁,这时候我们需要知道有多少个 reader 正在使用读锁, 等待这些 reader 释放读锁之后,就获取写锁了,而 readerWait 这个字段就是用来记录这个数量的。 在 Lock 中获取写锁的时候,如果观测到 readerWait 不为 0 则会阻塞等待,直到 readerWait 为 0 之后才会真正获取写锁,然后才可以进行写操作。

读锁源码剖析

获取读锁的方法如下:

1
2
3
4
5
6
7
// 获取读锁
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// 有 writer 在使用锁,阻塞等待 writer 完成
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}

读锁的实现很简单,先将 readerCount 加 1,如果加 1 之后的值小于 0,说明有 writer 正在使用锁,那么就需要阻塞等待 writer 完成。

释放读锁的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 释放读锁
func (rw *RWMutex) RUnlock() {
// readerCount 减 1,如果 readerCount 小于 0 说明有 writer 在等待
if r := rw.readerCount.Add(-1); r < 0 {
// 有 writer 在等待,唤醒 writer
rw.rUnlockSlow(r)
}
}

// 唤醒 writer
func (rw *RWMutex) rUnlockSlow(r int32) {
// 未 Lock 就 Unlock,panic
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
// readerWait 减 1,返回值是新的 readerWait 值
if rw.readerWait.Add(-1) == 0 {
// 最后一个 reader 唤醒 writer
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

读锁的实现总结:

  • 获取读锁的时候,会将 readerCount 加 1
  • 如果正在获取读锁的时候,发现 readerCount 小于 0,说明有 writer 正在使用锁,那么就需要阻塞等待 writer 完成。
  • 释放读锁的时候,会将 readerCount 减 1
  • 如果 readerCount 减 1 之后小于 0,说明有 writer 正在等待,那么就需要唤醒 writer。
  • 唤醒 writer 的时候,会将 readerWait 减 1,如果 readerWait 减 1 之后为 0,说明 writer 获取锁的时候存在的 reader 都已经释放了读锁,可以获取写锁了。

·rwmutexMaxReaders算是一个特殊的标识,在获取写锁的时候会将readerCount的值减去rwmutexMaxReaders, 所以在其他地方可以根据readerCount` 是否小于 0 来判断是否有 writer 正在使用锁。

写锁源码剖析

获取写锁的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取写锁
func (rw *RWMutex) Lock() {
// 首先,解决与其他写入者的竞争。
rw.w.Lock()
// 向读者宣布有一个待处理的写入。
// r 就是当前还没有完成的读操作,等这部分读操作完成之后才可以获取写锁。
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// 等待活跃的 reader
if r != 0 && rw.readerWait.Add(r) != 0 {
// 阻塞,等待最后一个 reader 唤醒
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}

释放写锁的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 释放写锁
func (rw *RWMutex) Unlock() {
// 向 readers 宣布没有活动的 writer。
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders { // r >= 0 并且 < rwmutexMaxReaders 才是正常的(r 是持有写锁期间尝试获取读锁的 reader 数量)
fatal("sync: Unlock of unlocked RWMutex")
}
// 如果有 reader 在等待写锁释放,那么唤醒这些 reader。
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 允许其他的 writer 继续进行。
rw.w.Unlock()
}

写锁的实现总结:

  • 获取写锁的时候,会将 readerCount 减去 rwmutexMaxReaders,这样就可以区分读锁和写锁了。
  • 如果 readerCount 减去 rwmutexMaxReaders 之后不为 0,说明有 reader 正在使用读锁,那么就需要阻塞等待这些 reader 释放读锁。
  • 释放写锁的时候,会将 readerCount 加上 rwmutexMaxReaders
  • 如果 readerCount 加上 rwmutexMaxReaders 之后大于 0,说明有 reader 正在等待写锁释放,那么就需要唤醒这些 reader。

TryRLock 和 TryLock

TryRLockTryLock 的实现都很简单,都是尝试获取读锁或者写锁,如果获取不到就返回 false,获取到了就返回 true,这两个方法不会阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// TryRLock 尝试锁定 rw 以进行读取,并报告是否成功。
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
// 有 goroutine 持有写锁
if c < 0 {
return false
}
// 尝试获取读锁
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}

// TryLock 尝试锁定 rw 以进行写入,并报告是否成功。
func (rw *RWMutex) TryLock() bool {
// 写锁被占用
if !rw.w.TryLock() {
return false
}
// 读锁被占用
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
// 释放写锁
rw.w.Unlock()
return false
}
// 成功获取到锁
return true
}

总结

RWMutex 使用起来比较简单,相比 Mutex 而言,它区分了读锁和写锁,可以提高并发性能。最后,总结一下本文内容:

  • RWMutex 有两种锁:读锁和写锁。
  • 读锁可以被多个 goroutine 同时持有,写锁只能被一个 goroutine 持有。也就是可以并发读,但只能互斥写。
  • 写锁被占用的时候,其他的读和写操作都会被阻塞。读锁被占用的时候,其他的写操作会被阻塞,但是读操作不会被阻塞。除非读操作发生在一个新的写操作之后。
  • RWMutex 包含以下几个方法:
    • Lock:获取写锁,如果有其他的写锁或者读锁被占用,那么就会阻塞等待。
    • Unlock:释放写锁。
    • RLock:获取读锁,如果写锁被占用,那么就会阻塞等待。
    • RUnlock:释放读锁。
  • 也包含了两个非阻塞的方法:
    • TryLock:尝试获取写锁,如果获取不到就返回 false,获取到了就返回 true
    • TryRLock:尝试获取读锁,如果获取不到就返回 false,获取到了就返回 true
  • RWMutex 使用的注意事项跟 Mutex 差不多:
    • 使用之后不能复制
    • Unlock 之前需要有 Lock 调用,否则 panicRUnlock 之前需要有 RLock 调用,否则 panic
    • 不要忘记使用 UnlockRUnlock 释放锁。
  • RWMutex 的实现:
    • 写锁还是使用 Mutex 来实现。
    • 获取读锁和写锁的时候,如果获取不到都会阻塞等待,直到被唤醒。
    • 获取写锁的时候,会将 readerCount 减去 rwmutexMaxReaders,这样就可以直到有写锁被占用。释放写锁的时候,会将 readerCount 加上 rwmutexMaxReaders
    • 获取写锁的时候,如果还有读操作未完成,那么这一次获取写锁只会等待这部分未完成的读操作完成。所有后续的操作只能等待这一次写锁释放。

在我们的日常开发中,总会有时候需要对一些变量做并发读写,比如 web 应用在同时接到多个请求之后, 需要对一些资源做初始化,而这些资源可能是只需要初始化一次的,而不是每一个 http 请求都初始化, 在这种情况下,我们需要限制只能一个协程来做初始化的操作,比如初始化数据库连接等, 这个时候,我们就需要有一种机制,可以限制只有一个协程来执行这些初始化的代码。 在 go 语言中,我们可以使用互斥锁(Mutex)来实现这种功能。

互斥锁的定义

这里引用一下维基百科的定义:

互斥锁(Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两个线程同时对同一公共资源 (比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。 临街区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。

互斥,顾名思义,也就是只有一个线程能持有锁。当然,在 go 中,是只有一个协程能持有锁。

下面是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var sum int // 和
var mu sync.Mutex // 互斥锁

// add 将 sum 加 1
func add() {
// 获取锁,只能有一个协程获取到锁,
// 其他协程需要阻塞等待锁释放才能获取到锁。
mu.Lock()
// 临界区域
sum++
mu.Unlock()
}

func TestMutex(t *testing.T) {
// 启动 1000 个协程
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 1000; i++ {
go func() {
// 每个协程里面调用 add()
add()
wg.Done()
}()
}

// 等待所有协程执行完毕
wg.Wait()
// 最终 sum 的值应该是 1000
assert.Equal(t, 1000, sum)
}

上面的例子中,我们定义了一个全局变量 sum,用于存储和,然后定义了一个互斥锁 mu, 在 add() 函数中,我们使用 mu.Lock() 来加锁,然后对 sum 进行加 1 操作, 最后使用 mu.Unlock() 来解锁,这样就保证了在任意时刻,只有一个协程能够对 sum 进行加 1 操作, 从而保证了在并发执行 add() 操作的时候 sum 的值是正确的。

上面这个例子,在我之前的文章中已经作为例子出现过很多次了,这里不再赘述了。

go Mutex 的基本用法

Mutex 我们一般只会用到它的两个方法:

  • Lock:获取互斥锁。(只会有一个协程可以获取到锁,通常用在临界区开始的地方。)
  • Unlock: 释放互斥锁。(释放获取到的锁,通常用在临界区结束的地方。)

Mutex 的模型可以用下图表示:

mutex_1

说明:

  • 同一时刻只能有一个协程获取到 Mutex 的使用权,其他协程需要排队等待(也就是上图的 G1->G2->Gn)。
  • 拥有锁的协程从临界区退出的时候需要使用 Unlock 来释放锁,这个时候等待队列的下一个协程可以获取到锁(实际实现比这里说的复杂很多,后面会细说),从而进入临界区。
  • 等待的协程会在 Lock 调用处阻塞,Unlock 的时候会使得一个等待的协程解除阻塞的状态,得以继续执行。

上面提到的这几点也是 Mutex 的基本原理。

互斥锁使用的两个例子

了解了 go Mutex 基本原理之后,让我们再来看看 Mutex 的一些使用的例子。

gin Context 中的 Set 方法

一个很常见的场景就是,并发对 map 进行读写,熟悉 go 的朋友应该知道,go 中的 map 是不支持并发读写的, 如果我们对 map 进行并发读写会导致 panic

而在 ginContext 结构体中,也有一个 map 类型的字段 Keys,用来在上下文间传递键值对数据, 所以在通过 Set 来设置键值对的时候需要使用 c.mu.Lock() 来先获取互斥锁,然后再对 Keys 做设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Set is used to store a new key/value pair exclusively for this context.
// It also lazy initializes c.Keys if it was not used previously.
func (c *Context) Set(key string, value any) {
// 获取锁
c.mu.Lock()
// 如果 Keys 还没初始化,则进行初始化
if c.Keys == nil {
c.Keys = make(map[string]any)
}

// 设置键值对
c.Keys[key] = value
// 释放锁
c.mu.Unlock()
}

同样的,对 Keys 做读操作的时候也需要使用互斥锁:

1
2
3
4
5
6
7
8
9
10
11
// Get returns the value for the given key, ie: (value, true).
// If the value does not exist it returns (nil, false)
func (c *Context) Get(key string) (value any, exists bool) {
// 获取锁
c.mu.RLock()
// 读取 key
value, exists = c.Keys[key]
// 释放锁
c.mu.RUnlock()
return
}

可能会有人觉得奇怪,为什么从 map 中读也还需要锁。这是因为,如果读的时候没有锁保护, 那么就有可能在 Set 设置的过程中,同时也在进行读操作,这样就会 panic 了。

这个例子想要说明的是,像 map 这种数据结构本身就不支持并发读写,我们这种情况下只有使用 Mutex 了。

sync.Pool 中的 pinSlow 方法

sync.Pool 的实现中,有一个全局变量记录了进程内所有的 sync.Pool 对象,那就是 allPools 变量, 另外有一个锁 allPoolsMu 用来保护对 allPools 的读写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
var (
// 保护 allPools 和 oldPools 的互斥锁。
allPoolsMu Mutex

// allPools is the set of pools that have non-empty primary
// caches. Protected by either 1) allPoolsMu and pinning or 2)
// STW.
allPools []*Pool

// oldPools is the set of pools that may have non-empty victim
// caches. Protected by STW.
oldPools []*Pool
)

pinSlow 方法中会在 allPoolsMu 的保护下对 allPools 做读写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock() // 获取锁
defer allPoolsMu.Unlock() // 函数返回的时候释放锁
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p) // 全局变量修改
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}

这个例子主要是为了说明使用 mu 的另外一种非常常见的场景:并发读写全局变量

互斥锁使用的注意事项

互斥锁如果使用不当,可能会导致死锁或者出现 panic 的情况,下面是一些常见的错误:

  1. 忘记使用 Unlock 释放锁。
  2. Lock 之后还没 Unlock 之前又使用 Lock 获取锁。也就是重复上锁,go 中的 Mutex 不可重入。
  3. 死锁:位于临界区内不同的两个协程都想获取对方持有的不同的锁。
  4. 还没 Lock 之前就 Unlock。这会导致 panic,因为这是没有任何意义的。
  5. 复制 Mutex,比如将 Mutex 作为参数传递。

对于第 1 点,我们往往可以使用 defer 关键字来做释放锁的操作。第 2 点不太好发现,只能在开发的时候多加注意。 第 3 点我们在使用锁的时候可以考虑尽量避免在临界区内再去使用别的锁。 最后,Mutex 是不可以复制的,这个可以在编译之前通过 go vet 来做检查。

为什么 Mutex 不能被复制呢?因为 Mutex 中包含了锁的状态,如果复制了,那么这个状态也会被复制, 如果在复制前进行 Lock,复制后进行 Unlock,那就意味着 LockUnlock 操作的其实是两个不同的状态, 这样显然是不行的,是释放不了锁的。

虽然不可以复制,但是我们可以通过传递指针类型的参数来传递 Mutex

互斥锁锁定的是什么?

在前一篇文章中,我们提到过,原子操作本质上是变量级的互斥锁。而互斥锁本身锁定的又是什么呢? 其实互斥锁本质上是一个信号量,它通过获取释放信号量,最终使得协程获得某一个代码块的执行权力。

也就是说,互斥锁,锁定的是一块代码块。

我们以 go-zero 里面的 collection/fifo.go 为例子说明一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Take takes the first element out of q if not empty.
func (q *Queue) Take() (any, bool) {
// 获取互斥锁(只能有一个协程获取到锁)
q.lock.Lock()
// 函数返回的时候释放互斥锁(获取到锁的协程释放锁之后,其他协程才能进行抢占锁)
defer q.lock.Unlock()

// 下面的代码只有抢占到(也就是互斥锁锁定的代码块)
if q.count == 0 {
return nil, false
}

element := q.elements[q.head]
q.head = (q.head + 1) % len(q.elements)
q.count--

return element, true
}

除了锁定代码块的这一个作用,有另外一个比较关键的地方也是我们不能忽视的, 那就是 互斥锁并不保证临界区内操作的变量不能被其他协程访问。 互斥锁只能保证一段代码只能一个协程执行,但是对于临界区内涉及的共享资源, 你在临界区外也依然是可以对其进行读写的。

我们以上面的代码说明一下:在上面的 Take 函数中,我们对 q.headq.count 都进行了操作, 虽然这些操作代码位于临界区内,但是临界区并不保证持有锁期间其他协程不会在临界区外去修改 q.headq.count

下面就是一个非常典型的错误的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import (
"fmt"
"sync"
"testing"
)

var mu sync.Mutex
var sum int

// 在锁的保护下对 sum 做读写操作
func test() {
mu.Lock()
sum++
mu.Unlock()
}

func TestMutex(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 500; i++ {
go func() {
test()
wg.Done()
}()

// 位于临界区外,也依然是可以对 sum 做读写操作的。
sum++
}

wg.Wait()

fmt.Println(sum)
}

靠谱的做法是,对于有共享资源的读写的操作都使用 Mutex 保护起来。

当然,如果我们只有一个变量,那么可能使用原子操作就足够了。

互斥锁实现原理

互斥锁的实现有以下几个关键的地方:

  • 信号量:这是操作系统中的同步对象。
  • 等待队列:获取不到互斥锁的协程,会放入到一个先入先出队列的队列尾部。这样信号量释放的时候,可以依次对它们唤醒。
  • 原子操作:互斥锁的实现中,使用了一个字段来记录了几种不同的状态,使用原子操作可以保证几种状态可以一次性变更完成。

我们先来看看 Mutex结构体定义:

1
2
3
4
type Mutex struct {
state int32 // 状态字段
sema uint32 // 信号量
}

其中 state 字段记录了四种不同的信息:

mutex_2

这四种不同信息在源码中定义了不同的常量:

1
2
3
4
5
6
7
8
const (
mutexLocked = 1 << iota // 表示有 goroutine 拥有锁
mutexWoken // 唤醒(就是第 2 位)
mutexStarving // 饥饿(第 3 位)
mutexWaiterShift = iota // 表示第 4 位开始,表示等待者的数量

starvationThresholdNs = 1e6 // 1ms 进入饥饿模式的等待时间阈值
)

sema 的含义比较简单,就是一个用作不同 goroutine 同步的信号量。

信号量

go 的 Mutex 是基于信号量来实现的,那信号量又是什么呢?

维基百科:信号量是一个同步对象,用于保持在 0 至指定最大值之间的一个计数值。当线程完成一次对该 semaphore 对象的等待(wait)时,该计数值减一;当线程完成一次对 semaphore 对象的释放(release)时,计数值加一。

上面这个解释有点难懂,通俗地说,就是一个数字,调用 wait 的时候,这个数字减去 1,调用 release 的时候,这个数字加上 1。 (还有一个隐含的逻辑是,如果这个数小于 0,那么调用 wait 的时候会阻塞,直到它大于 0。)

对应到 go 的 Mutex 中,有两个操作信号量的函数:

  • runtime_Semrelease: 自动递增信号量并通知等待的 goroutine。
  • runtime_SemacquireMutex: 是一直等到信号量大于 0,然后自动递减。

我们注意到了,其实 runtime_SemacquireMutex 是有一个前提条件的,那就是等到信号量大于 0。 其实信号量的两个操作 P/V 就是一个加 1 一个减 1,所以在实际使用的时候,也是需要一个获取锁的操作对应一个释放锁的操作, 否则,其他协程都无法获取到锁,因为信号量一直不满足。

等待队列

go 中如果已经有 goroutine 持有互斥锁,那么其他的协程会放入一个 FIFO 队列中,如下图:

mutex_3

说明:

  • G1 表示持有互斥锁的 goroutine,G2...Gn 表示一个 goroutine 的等待队列,这是一个先入先出的队列。
  • G1 先持有锁,得以进入临界区,其他想抢占锁的 goroutine 阻塞在 Lock 调用处。
  • G1 在使用完锁后,会使用 Unlock 来释放锁,本质上是释放了信号量,然后会唤醒 FIFO 队列头部的 goroutine
  • G2FIFO 队列中移除,进入临界区。G2 使用完锁之后也会使用 Unlock 来释放锁。

上面只是一个大概模型,在实际实现中,比这个复杂很多倍,下面会继续深入讲解。

原子操作

go 的 Mutex 实现中,state 字段是一个 32 位的整数,不同的位记录了四种不同信息,在这种情况下, 只需要通过原子操作就可以保证一次性实现对四种不同状态信息的更改,而不需要更多额外的同步机制。

但是毋庸置疑,这种实现会大大降低代码的可读性,因为通过一个整数来记录不同的信息, 就意味着,需要通过各种位运算来实现对这个整数不同位的修改,比如将上锁的操作:

1
new |= mutexLocked

当然,这只是 Mutex 实现中最简单的一种位运算了。下面以 state 记录的四种不同信息为维度来具体讲解一下:

  • mutexLocked:这是 state 的最低位,1 表示锁被占用,0 表示锁没有被占用。
    • new := mutexLocked 新状态为上锁状态
  • mutexWoken: 这是表示是否有协程被唤醒了的状态
    • new = (old - 1<<mutexWaiterShift) | mutexWoken 等待者数量减去 1 的同时,设置唤醒标识
    • new &^= mutexWoken 清除唤醒标识
  • mutexStarving:饥饿模式的标识
    • new |= mutexStarving 设置饥饿标识
  • 等待者数量:state >> mutexWaiterShift 就是等待者的数量,也就是上面提到的 FIFO 队列中 goroutine 的数量
    • new += 1 << mutexWaiterShift 等待者数量加 1
    • delta := int32(mutexLocked - 1<<mutexWaiterShift) 上锁的同时,将等待者数量减 1

这里并没有涵盖 Mutex 中所有的位运算,其他操作在下文讲解源码实现的时候会提到。

在上面做了这一系列的位运算之后,我们会得到一个新的 state 状态,假设名为 new,那么我们就可以通过 CAS 操作来将 Mutexstate 字段更新:

1
atomic.CompareAndSwapInt32(&m.state, old, new)

通过上面这个原子操作,我们就可以一次性地更新 Mutexstate 字段,也就是一次性更新了四种状态信息。

这种通过一个整数记录不同状态的写法在 sync 包其他的一些地方也有用到,比如 WaitGroup 中的 state 字段。

最后,对于这种操作,我们需要注意的是,因为我们在执行 CAS 前后是没有其他什么锁或者其他的保护机制的, 这也就意味着上面的这个 CAS 操作是有可能会失败的,那如果失败了怎么办呢?

如果失败了,也就意味着肯定有另外一个 goroutine 率先执行了 CAS 操作并且成功了,将 state 修改为了一个新的值。 这个时候,其实我们前面做的一系列位运算得到的结果实际上已经不对了,在这种情况下,我们需要获取最新的 state,然后再次计算得到一个新的 state

所以我们会在源码里面看到 CAS 操作是写在 for 循环里面的。

Mutex 的公平性

在前面,我们提到 goroutien 获取不到锁的时候,会进入一个 FIFO 队列的队列尾,在实际实现中,其实没有那么简单, 为了获得更好的性能,在实现的时候会尽量先让运行状态的 goroutine 获得锁,当然如果队列中的 goroutine 等待太久(大于 1ms), 那么就会先让队列中的 goroutine 获得锁。

下面是文档中的说明:

Mutex 可以处于两种操作模式:正常模式和饥饿模式。在正常模式下,等待者按照FIFO(先进先出)的顺序排队,但是被唤醒的等待者不拥有互斥锁,会与新到达的 Goroutine 竞争所有权。新到达的 Goroutine 有优势——它们已经在 CPU 上运行,数量可能很多,因此被唤醒的等待者有很大的机会失去锁。在这种情况下,它将排在等待队列的前面。如果等待者未能在1毫秒内获取到互斥锁,则将互斥锁切换到饥饿模式。 在饥饿模式下,互斥锁的所有权直接从解锁 Goroutine 移交给队列前面的等待者。新到达的 Goroutine 即使看起来未被锁定,也不会尝试获取互斥锁,也不会尝试自旋。相反,它们会将自己排队在等待队列的末尾。如果等待者获得互斥锁的所有权并发现(1)它是队列中的最后一个等待者,或者(2)它等待时间少于1毫秒,则将互斥锁切换回正常模式。 正常模式的性能要优于饥饿模式,因为 Goroutine 可以连续多次获取互斥锁,即使有被阻塞的等待者。饥饿模式很重要,可以防止尾部延迟的病态情况。

简单总结:

  • Mutex 有两种模式:正常模式、饥饿模式。
  • 正常模式下:
    • 被唤醒的 goroutine 和正在运行的 goroutine 竞争锁。这样可以运行中的协程有机会先获取到锁,从而避免了协程切换的开销。性能更好。
  • 饥饿模式下:
    • 优先让队列中的 goroutine 获得锁,并且直接放弃时间片,让给队列中的 goroutine,运行中的 goroutine 想获取锁要到队尾排队。更加公平。

Mutex 源码剖析

Mutex 本身的源码其实很少,但是复杂程度是非常高的,所以第一次看的时候可能会非常懵逼,但是不妨碍我们去了解它的大概实现原理。

Mutex 中主要有两个方法,LockUnlock,使用起来非常的简单,但是它的实现可不简单。下面我们就来深入了解一下它的实现。

Lock

Lock 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Lock 获取锁。
// 如果锁已在使用中,则调用 goroutine 将阻塞,直到互斥量可用。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 上锁成功则直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}

// Slow path (outlined so that the fast path can be inlined)
// 没有上锁成功,这个时候需要做的事情就有点多了。
m.lockSlow()
}

Lock 方法中,第一次获取锁的时候是非常简单的,一个简单的原子操作设置一下 mutexLocked 标识就完成了。 但是如果这个原子操作失败了,表示有其他 goroutine 先获取到了锁,这个时候就需要调用 lockSlow 来做一些额外的操作了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// 获取 mutex 锁
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 当前协程开始等待的时间
starving := false // 当前协程是否是饥饿模式
awoke := false // 唤醒标志(是否当前协程就是被唤醒的协程)
iter := 0 // 自旋次数(超过一定次数如果还没能获得锁,就进入等待)
old := m.state // 旧的状态,每次 for 循环会重新获取当前的状态字段

for {
// 自旋:目的是让正在运行中的 goroutine 尽快获取到锁。
// 两种情况不会自旋:
// 1. 饥饿模式:在饥饿模式下,锁会直接交给等待队列中的 goroutine,所以不会自旋。
// 2. 锁被释放了:另外如果运行到这里的时候,发现锁已经被释放了,也就不需要自旋了。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 设置 mutexWoken 标识
// 如果自旋是有意义的,则会进入到这里,尝试设置 mutexWoken 标识。
// 设置成功在持有锁的 goroutine 获取锁的时候不会唤醒等待队列中的 goroutine,下一个获取锁的就是当前 goroutine。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 各个判断的含义:
// !awoke 已经被唤醒过一次了,说明当前协程是被从等待队列中唤醒的协程/又或者已经成功设置 mutexWoken 标识了,不需要再唤醒了。
// old&mutexWoken == 0 如果不等于 0 说明有 goroutine 被唤醒了,不会尝试设置 mutexWoken 标识
// old>>mutexWaiterShift != 0 如果等待队列为空,当前 goroutine 就是下一个抢占锁的 goroutine
// 前面的判断都通过了,才会进行 CAS 操作尝试设置 mutexWoken 标识
awoke = true
}
runtime_doSpin() // 自旋
iter++ // 自旋次数 +1(超过一定次数会停止自旋)
old = m.state // 再次获取锁的最新状态,之后会检查是否锁被释放了
continue // 继续下一次检查
}

new := old
// 饥饿模式下,新到达的 goroutines 必须排队。
// 不是饥饿状态,直接竞争锁。
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 进入等待队列的两种情况:
// 1. 锁依然被占用。
// 2. 进入了饥饿模式。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 等待者数量 +1
}
// 已经等待超过了 1ms,且锁被其他协程占用,则进入饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 唤醒之后,需要重置唤醒标志。
// 不管有没有获取到锁,都是要清除这个标识的:
// 获取到锁肯定要清除,如果获取到锁,需要让其他运行中的 goroutine 来抢占锁;
// 如果没有获取到锁,goroutine 会阻塞,这个时候是需要持有锁的 goroutine 来唤醒的,如果有 mutexWoken 标识,持有锁的 goroutine 唤醒不了。
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 重置唤醒标志
}

// 成功设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 { // 这意味着当前的 goroutine 成功获取了锁
break
}

// 如果已经被唤醒过,会被加入到等待队列头。
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
// queueLifo 为 true,表示加入到队列头。否则,加入到队列尾。
// (首次加入队列加入到队尾,不是首次加入则加入队头,这样等待最久的 goroutine 优先能够获取到锁。)
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 从等待队列中唤醒,检查锁是否应该进入饥饿模式。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

// 获取当前的锁最新状态
old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回。
// 饥饿模式下,被唤醒的协程可以直接获取到锁。
// 新来的 goroutine 都需要进入队列等待。
if old&mutexStarving != 0 {
// 如果这个 goroutine 被唤醒并且 Mutex 处于饥饿模式,P 的所有权已经移交给我们,
// 但 Mutex 处于不一致的状态:mutexLocked 未设置,我们仍然被视为等待者。修复这个问题。
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加锁,并且减少等待者数量。
// 实际上是两步操作合成了一步:
// 1. m.state = m.state + 1 (获取锁)
// 2. m.state = m.state - 1<<mutexWaiterShift(waiter - 1)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 清除饥饿状态的两种情况:
// 1. 如果不需要进入饥饿模式(当前被唤醒的 goroutine 的等待时间小于 1ms)
// 2. 原来的等待者数量为 1,说明是最后一个被唤醒的 goroutine。
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
delta -= mutexStarving
}
// 原子操作,设置新状态。
atomic.AddInt32(&m.state, delta)
break
}
// 设置唤醒标记,重新抢占锁(会与那些运行中的 goroutine 一起竞争锁)
awoke = true
iter = 0
} else {
// CAS 更新状态失败,获取最新状态,然后重试
old = m.state
}
}
}

我们可以看到,lockSlow 的处理非常的复杂,又要考虑让运行中的 goroutine 尽快获取到锁,又要考虑不能让等待队列中的 goroutine 等待太久。

代码中注释很多,再简单总结一下其中的流程:

  1. 为了让循环中的 goroutine 可以先获取到锁,会先让 goroutine 自旋等待锁的释放,这是因为运行中的 goroutine 正在占用 CPU,让它先获取到锁可以避免一些不必要的协程切换,从而获得更好的性能。
  2. 自旋完毕之后,会尝试获取锁,同时也要根据旧的锁状态来更新锁的不同状态信息,比如是否进入饥饿模式等。
  3. 计算得到一个新的 state 后,会进行 CAS 操作尝试更新 state 状态。
  4. CAS 失败会重试上面的流程。
  5. CAS 成功之后会做如下操作:
  • 判断当前是否已经获取到锁,如果是,则返回,Lock 成功了。
  • 会判断当前的 goroutine 是否是已经被唤醒过,如果是,会将当前 goroutine 加入到等待队列头部。
  • 调用 runtime_SemacquireMutex,进入阻塞状态,等待下一次唤醒。
  • 唤醒之后,判断是否需要进入饥饿模式。
  • 最后,如果已经是饥饿模式,当前 goroutine 直接获取到锁,退出循环,否则,再进行下一次抢占锁的循环中。

具体流程我们可以参考一下下面的流程图:

mutex_4

图中有一些矩形方框描述了 unlockSlow 的关键流程。

Unlock

Unlock 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
// Unlock 释放互斥锁。
// 如果 m 在进入 Unlock 时未被锁定,则会出现运行时错误。
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
// unlock 成功
// unLock 操作实际上是将 state 减去 1。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 { // 等待队列为空的时候直接返回了
// 唤醒一个等待锁的 goroutine
m.unlockSlow(new)
}
}

Unlock 做了两件事:

  1. 释放当前 goroutine 持有的互斥锁:也就是将 state 减去 1
  2. 唤醒等待队列中的下一个 goroutine

如果只有一个 goroutine 在使用锁,只需要简单地释放锁就可以了。 但是如果有其他的 goroutine 在阻塞等待,那么持有互斥锁的 goroutine 就有义务去唤醒下一个 goroutine。

唤醒的流程相对复杂一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// unlockSlow 唤醒下一个等待锁的协程。
func (m *Mutex) unlockSlow(new int32) {
// 如果未加锁,则会抛出错误。
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}

// 下面的操作是唤醒一个在等待锁的协程。
// 存在两种情况:
// 1. 正常模式:
// a. 不需要唤醒:没有等待者、锁已经被抢占、有其他运行中的协程在尝试获取锁、已经进入了饥饿模式
// b. 需要唤醒:其他情况
// 2. 饥饿模式:唤醒等待队列头部的那个协程
if new&mutexStarving == 0 {
// 不是饥饿模式
old := new
// 自旋
for {
// 下面几种情况不需要唤醒:
// 1. 没有等待者了(没得唤醒)
// 2. 锁已经被占用(只能有一个 goroutine 持有锁)
// 3. 有其他运行中的协程已经被唤醒(运行中的 goroutine 通过自旋先抢占到了锁)
// 4. 饥饿模式(饥饿模式下,所有新的 goroutine 都要排队,饥饿模式会直接唤醒等待队列头部的 gorutine)
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 获取到唤醒等待者的权力,开始唤醒一个等待者。
// 下面这一行实际上是两个操作:
// 1. waiter 数量 - 1
// 2. 设置 mutexWoken 标志
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 正常模式下唤醒了一个 goroutine
//(第二个参数为 false,表示当前的 goroutine 在释放信号量后还会继续执行直到用完时间片)
runtime_Semrelease(&m.sema, false, 1)
return
}
// 唤醒失败,进行下一次尝试。
old = m.state
}
} else {
// 饥饿模式:将互斥锁的所有权移交给下一个等待者,并放弃我们的时间片,以便下一个等待者可以立即开始运行。
// 注意:如果“mutexLocked”未设置,等待者在唤醒后会将其设置。
// 但是,如果设置了“mutexStarving”,则仍然认为互斥锁已被锁定,因此新到来的goroutine不会获取它。
//
// 当前的 goroutine 放弃 CPU 时间片,让给阻塞在 sema 的 goroutine。
runtime_Semrelease(&m.sema, true, 1)
}
}

unlockSlow 逻辑相比 lockSlow 要简单许多,我们可以再结合下面的流程图来阅读上面的源码:

mutex_5

runtime_Semrelease 第二个参数的含义

细心的朋友可能注意到了,在 unlockSlow 的实现中,有两处地方调用了 runtime_Semrelease 这个方法, 这个方法的作用是释放一个信号量,这样可以让阻塞在信号量上的 goroutine 得以继续执行。 它的第一个参数我们都知道,是信号量,而第二个参数 truefalse 分别传递了一次, 那么 truefalse 分别有什么作用呢?

答案是,设置为 true 的时候,当前的 goroutine 会直接放弃自己的时间片, 将 P 的使用权交给 Mutex 等待队列中的第一个 goroutine, 这样的目的是,让 Mutex 等待队列中的 goroutine 可以尽快地获取到锁。

总结

互斥锁在并发编程中也算是非常常见的一种操作了,使用互斥锁可以限制只有一个 goroutine 可以进入临界区, 这对于并发修改全局变量、初始化等情况非常好用。最后,再总结一下本文所讲述的内容:

  • 互斥锁是一种用于多线程编程中,防止两个线程同时对同一公共资源进行读写的机制。go 中的互斥锁实现是 sync.Mutex
  • Mutex 的操作只有两个:
    • Lock 获取锁,同一时刻只能有一个 goroutine 可以获取到锁,其他 goroutine 会先通过自旋抢占锁,抢不到则阻塞等待。
    • Unlock 释放锁,释放锁之前必须有 goroutine 持有锁。释放锁之后,会唤醒等待队列中的下一个 goroutine。
  • Mutex 常见的使用场景有两个:
    • 并发读写 map:如 ginContextKeys 属性的读写。
    • 并发读写全局变量:如 sync.Pool 中对 allPools 的读写。
  • 使用 Mutex 需要注意以下几点:
    • 不要忘记使用 Unlock 释放锁
    • Lock 之后,没有释放锁之前,不能再次使用 Lock
    • 注意不同 goroutine 竞争不同锁的情况,需要考虑一下是否有可能会死锁
    • Unlock 之前,必须已经调用了 Lock,否则会 panic
    • 在第一次使用 Mutex 之后,不能复制,因为这样一来 Mutex 的状态也会被复制。这个可以使用 go vet 来检查。
  • 互斥锁可以保护一块代码块只能有一个 goroutine 执行,但是不保证临界区内操作的变量不被其他 goroutine 做并发读写操作。
  • go 的 Mutex 基于以下技术实现:
    • 信号量:这是操作系统层面的同步机制
    • 队列:在 goroutine 获取不到锁的时候,会将这些 goroutine 放入一个 FIFO 队列中,下次唤醒会唤醒队列头的 goroutine
    • 原子操作:state 字段记录了四种不同的信息,通过原子操作就可以保证数据的完整性
  • go Mutex 的公平性:
    • 正在运行的 goroutine 如果需要锁的话,尽量让它先获取到锁,可以避免不必要的协程上下文切换。会和被唤醒的 goroutine 一起竞争锁。
    • 但是如果等待队列中的 goroutine 超过了 1ms 还没有获取到锁,那么会进入饥饿模式
  • go Mutex 的两种模式:
    • 正常模式:运行中的 goroutine 有一定机会比等待队列中的 goroutine 先获取到锁,这种模式有更好的性能。
    • 饥饿模式:所有后来的 goroutine 都直接进入等待队列,会依次从等待队列头唤醒 goroutine。可以有效避免尾延迟。
  • 饥饿模式下,Unlock 的时候会直接将当前 goroutine 所在 P 的使用权交给等待队列头部的 goroutine,放弃原本属于自己的时间片。
0%