Laravel Horizon 定时 kill 掉旧的消费者进程

问题

生产环境使用了 horizon 来处理队列数据,但是在 supervisor 自动重启的过程中,发现有时候会有一些消费者进程在部署的时候无法正常被 kill 掉,导致一些旧的代码消费队列数据,从而引起报错。

原因是可能在重启的时候,那个消费者恰好在处理长时间的任务,导致无法被 kill 掉。但无法肯定是这个原因。

解决方法

每分钟 kill 掉那些不是当前 horizon 子孙进程的消费者(S 状态才会 kill 掉,保证正常业务不受影响)。

解决思路

  1. 通过 shell_exec 来执行 pspstree 等命令查看当前的进程信息,从而找出那些异常的消费者进程 id。
  2. 判断这些异常进程是否没有在处理任务。
  3. kill 掉 S 状态的异常消费者进程。

注意:shell_exec 里面使用 ps aux 可能无法获取正常在终端运行的结果,需要使用 ps -efww 才可以获取完整的输出。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;

class ClearInvalidQueueConsumer extends Command
{
protected $signature = 'clear-invalid-queue-consumer';

protected $description = '清除无效的队列消费者进程';

public function handle()
{
$noParentPids = $this->noParentPids();
$sleepingProcessPids = $this->getSleepingProcessPids($noParentPids);

if ($sleepingProcessPids) {
Log::info('kill 掉无效的队列进程:', $sleepingProcessPids);
$this->killInvalidProcess($sleepingProcessPids);
}
}

public function noParentPids(): array
{
$pids = $this->getValidPids();

$allPids = $this->getAllQueueProcessPids();

return array_diff($allPids, $pids);
}

/**
* kill 掉无效的进程
*
* @param array $pids
*/
private function killInvalidProcess(array $pids)
{
foreach ($pids as $pid) {
if (!posix_kill($pid, SIGTERM)) {
Log::error("kill ${pid} error!");
}

if ($error = posix_get_last_error()) {
Log::error('kill process error: ' . posix_strerror($error));
}
// system("kill -9 {$pid}");
}
}

/**
* 从 pid 数组里面筛选出睡眠状态的进程
*
* @param array $pids
*
* @return array
*/
public function getSleepingProcessPids(array $pids): array
{
$result = [];

foreach ($pids as $pid) {
if ($this->isSleeping($pid)) {
$result[] = $pid;
}
}

return $result;
}

/**
* 进程是否是睡眠状态
*
* @param $pid
*
* @return bool
*/
public function isSleeping($pid): bool
{
$res = $this->grepPidStatus($pid);

if ($res) {
$status = $this->statusOfLine($res);
if ($status == 'S') {
return true;
}
}

return false;
}

public function grepPidStatus($pid)
{
return shell_exec("ps -efww | grep php | grep ${pid} | grep -v grep");
}

/**
* 获取当前所有的队列进程
*
* @return array
*/
public function getAllQueueProcessPids(): array
{
$res = $this->horizonProcesses();
$allPids = [];

foreach (explode("\n", $res) as $line) {
if (!$line) {
continue;
}

$allPids[] = $this->pidOfLine($line);
}

return $allPids;
}

public function horizonProcesses()
{
return shell_exec('ps -efww | grep horizon | grep -v grep');
}

/**
* 当前有效的进程 id 数组
*
* @return array
*/
public function getValidPids(): array
{
$horizonPid = $this->horizonProcessPid();
if (!$horizonPid) {
return [];
}

$res = $this->pstree($horizonPid);

preg_match_all("/php,\d+/", $res, $matches);

$pids = [];
if ($matches) {
foreach ($matches[0] as $match) {
$pids[] = substr($match, 4);
}
}

return $pids;
}

public function pstree($horizonPid)
{
return shell_exec("pstree -a {$horizonPid} -p");
}

/**
* 获取 horizon 启动进程 id
*
* @return int|mixed
*/
public function horizonProcessPid()
{
$res = $this->horizonPidLine();

if ($res) {
return $this->pidOfLine($res);
}

return 0;
}

public function horizonPidLine()
{
return shell_exec("ps -efww | grep -v grep | grep '/usr/lnmp/php/bin/php /home/www/api/default/current/artisan horizon'");
}

/**
* 从 ps aux 的输出行里面获取进程状态 S -> 睡眠
*
* @param string $line
*
* @return mixed
*/
public function statusOfLine(string $line)
{
return $this->fields($line)[7];
}

/**
* 从 ps aux 的输出行里面获取 pid
*
* @param string $line
*
* @return mixed
*/
public function pidOfLine(string $line)
{
return $this->fields($line)[1];
}

/**
* ps aux 输出行里面空格分隔的单词列表
*
* @param string $line
*
* @return array
*/
public function fields(string $line): array
{
$line = trim($line);

$fields = explode(' ', $line);

return array_values(array_filter($fields));
}
}