问题
生产环境使用了 horizon 来处理队列数据,但是在 supervisor
自动重启的过程中,发现有时候会有一些消费者进程在部署的时候无法正常被
kill 掉,导致一些旧的代码消费队列数据,从而引起报错。
原因是可能在重启的时候,那个消费者恰好在处理长时间的任务,导致无法被
kill 掉。但无法肯定是这个原因。
解决方法
每分钟 kill 掉那些不是当前 horizon 子孙进程的消费者(S 状态才会 kill
掉,保证正常业务不受影响)。
解决思路
- 通过
shell_exec
来执行
ps
、pstree
等命令查看当前的进程信息,从而找出那些异常的消费者进程 id。
- 判断这些异常进程是否没有在处理任务。
- kill 掉 S 状态的异常消费者进程。
注意:shell_exec
里面使用 ps aux
可能无法获取正常在终端运行的结果,需要使用 ps -efww
才可以获取完整的输出。
实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
| <?php
namespace App\Console\Commands;
use Illuminate\Console\Command; use Illuminate\Support\Facades\Log;
class ClearInvalidQueueConsumer extends Command { protected $signature = 'clear-invalid-queue-consumer';
protected $description = '清除无效的队列消费者进程';
public function handle() { $noParentPids = $this->noParentPids(); $sleepingProcessPids = $this->getSleepingProcessPids($noParentPids);
if ($sleepingProcessPids) { Log::info('kill 掉无效的队列进程:', $sleepingProcessPids); $this->killInvalidProcess($sleepingProcessPids); } }
public function noParentPids(): array { $pids = $this->getValidPids();
$allPids = $this->getAllQueueProcessPids();
return array_diff($allPids, $pids); }
private function killInvalidProcess(array $pids) { foreach ($pids as $pid) { if (!posix_kill($pid, SIGTERM)) { Log::error("kill ${pid} error!"); }
if ($error = posix_get_last_error()) { Log::error('kill process error: ' . posix_strerror($error)); }
} }
public function getSleepingProcessPids(array $pids): array { $result = [];
foreach ($pids as $pid) { if ($this->isSleeping($pid)) { $result[] = $pid; } }
return $result; }
public function isSleeping($pid): bool { $res = $this->grepPidStatus($pid);
if ($res) { $status = $this->statusOfLine($res); if ($status == 'S') { return true; } }
return false; }
public function grepPidStatus($pid) { return shell_exec("ps -efww | grep php | grep ${pid} | grep -v grep"); }
public function getAllQueueProcessPids(): array { $res = $this->horizonProcesses(); $allPids = [];
foreach (explode("\n", $res) as $line) { if (!$line) { continue; }
$allPids[] = $this->pidOfLine($line); }
return $allPids; }
public function horizonProcesses() { return shell_exec('ps -efww | grep horizon | grep -v grep'); }
public function getValidPids(): array { $horizonPid = $this->horizonProcessPid(); if (!$horizonPid) { return []; }
$res = $this->pstree($horizonPid);
preg_match_all("/php,\d+/", $res, $matches);
$pids = []; if ($matches) { foreach ($matches[0] as $match) { $pids[] = substr($match, 4); } }
return $pids; }
public function pstree($horizonPid) { return shell_exec("pstree -a {$horizonPid} -p"); }
public function horizonProcessPid() { $res = $this->horizonPidLine();
if ($res) { return $this->pidOfLine($res); }
return 0; }
public function horizonPidLine() { return shell_exec("ps -efww | grep -v grep | grep '/usr/lnmp/php/bin/php /home/www/api/default/current/artisan horizon'"); }
public function statusOfLine(string $line) { return $this->fields($line)[7]; }
public function pidOfLine(string $line) { return $this->fields($line)[1]; }
public function fields(string $line): array { $line = trim($line);
$fields = explode(' ', $line);
return array_values(array_filter($fields)); } }
|