0%

索引单个文档

我们可以使用一个 PUT 请求来索引单个文档:

1
2
3
4
PUT /customer/_doc/1
{
"name": "John Doe"
}
  • 使用 PUT 方法
  • customer: 索引名称
  • 1: 唯一的文档 ID
  • body: 文档内容,可以包含一个或多个键值对

如果索引不存在,则 Elasticsearch 会新建一个,然后存储该文档。

因为这是一个新的文档,所以响应结果显示文档的版本为 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"_index" : "customer",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 26,
"_primary_term" : 4
}

获取单个文档

我们可以通过一个 GET 请求来获取刚刚保存进索引的文档:

1
GET /customer/_doc/1

这里的 1 是文档唯一 ID。

响应结果如下,_source 是原始文档内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"_index": "customer",
"_type": "_doc",
"_id": "1",
"_version": 2,
"_seq_no": 1,
"_primary_term": 1,
"found": true,
"_source": {
"h": [
"health",
"index",
"docs.count"
]
}
}

使用 bulk API 批量索引文档

如果你有很多要索引的文档,则可以使用 bulk API 批量提交。使用批量处理批处理文档操作比单独提交请求要快得多,因为它可以最大程度地减少网络往返次数。

导入测试数据

https://www.elastic.co/guide/en/elasticsearch/reference/master/getting-started-index.html#getting-started-index

1、获取下载链接

命令行直接下载会非常慢,这个只有依靠代理去解决了,我们可以从 vagrant box add 的命令输出中拿到 box 的下载链接, 然后去浏览器通过代理来下载。

2、下载之后,新建一个 metadata.json 文件,内容如下:

1
2
3
4
5
6
7
8
9
10
{
"name": "laravel/homestead",
"versions": [{
"version": "9.5.0",
"providers": [{
"name": "virtualbox",
"url": "file://C:/Users/ruby/Downloads/homestead.box"
}]
}]
}

这里的 C:/Users/ruby/Downloads/homestead.box 是下载的 box 保存的路径。

这里的版本号是我们 homestead up 的时候获取到的版本。

3、最后一步

1
homestead up

环境:RabbitMQ + Lumen 5.5,消费者处理逻辑是在处理完消息之后 ack。

Worker 源码路径 。

今天突然发现一个 qa 环境的一个队列消息累积了几百万,最终发现是因为消费消息的 Worker 进程没有处理完消息就退出了处理,而且没有任何的记录。看源码发现 worker 里面有个 kill 函数,里面执行了 exit() 函数,我们都知道,这个函数是退出进程的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Kill the process.
*
* @param int $status
* @return void
*/
public function kill($status = 0)
{
$this->events->dispatch(new Events\WorkerStopping);

if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}

exit($status);
}

退出其实问题不大,但是这个退出的逻辑并没有标记这个 job 失败,这就导致下一次 Worker 进程启动的时候,继续拿到这个消息处理,然后处理到一定时候,又退出,如此无限循环。

Worker exit 的原因

我们可以看看 Worker 里面所有 exit 的调用,其实有两个地方:

  1. stopIfNecessary,kill 和 stop 函数最终都是 exit。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Stop the process if necessary.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
if ($this->shouldQuit) {
$this->kill();
}

if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
}
}

shouldQuit 是在一些连接断开的时候被设置为 true,又或者用户发送了 SIGTERM 信号给 Worker。

  1. registerTimeoutHandler,超时处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Register the worker timeout handler (PHP 7.1+).
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});

pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
}

所以总结一下有以下几个原因:

  • 数据库连接或其他连接断开
  • Worker 接收到 SIGTERM 信号
  • Worker 处理完一个 Job 之后发现 Worker 占用的内存超出了指定的内存
  • 用户执行了队列重启命令
  • Worker 执行时间超出了 timeout

exit 的影响

1、如果使用的是 redis

如果这个消息需要消费者处理的时间大于指定的 timeout,会导致消息没处理完就丢失。

2、如果使用的是 RabbitMQ,并且关闭了 AutoAck

Worker 进程没处理完就退出,然后消息还在队列中,下次启动 Worker 的时候继续消费这个消息,导致无限的重复消费。

解决方案

  1. 评估一下 Job 的最大运行时间,设置一个合适的 timeout,这个是必须的。
  2. 可以监听 WorkerStopping 事件,记录 Worker 异常退出的日志,但是需要注意的是,正常退出也会 fire 这个事件。所以有可能没有办法根据 log 来判断是否是异常退出(超时)。
  3. 这是 5.8 以下版本的 bug,我们可以升级到 5.8 以上的版本,在新版本中超时也会记录为失败,而不是单纯地退出。

这是个 bug

我们通过上面的代码也可以发现,其实 Worker 的超时回调其实并没有多少实际的处理,dispatch 一个 WorkerStopping 事件然后就 exit 了。但是我们有可能并没有监听这个事件,这就导致了 Worker 存在 timeout 过短的问题难以被及时发现。

其实这个 5.8 版本以下的 bug,在 5.8 以上的版本中这个已经修复了,超时的时候,Job 会被标记为超时,超过重试次数就被记录为失败的 job。

源码可在 https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L137 查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Register the worker timeout handler.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () use ($job, $options) {
if ($job) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job)
);
}

$this->kill(1);
});

pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}

其他问题

  1. Job 里面写死了 timeout 属性,会以这个 timeout 为准。

在调试过程中,发现明明 php artisan queue:work --timeout= 这里设置的 timeout 足够大了,但是 Worker 还是和原来一样退出了。在 RabbitMQ 的控制台发现消息里面有记录 Job 的 timeout 属性,然后 Worker 里面在判断到如果 job 里面有 timeout 属性的时候,就不会再使用命令行传递的 timeout。

1
2
3
4
5
6
7
8
9
10
11
/**
* Get the appropriate timeout for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}

2020-05-15 更新:Laravel 5.5 以及以上版本本身已经有一个方法实现了下面要实现的功能,有一个方法是 chunkById(),这个就是记录上次的最大 id,然后下次查询的时候从这个 id 开始查询。

有时候我们可能需要查询某一个表的全部数据做一些处理,这个时候有一个可行的方法就是直接调用模型的 get() 方法,又或者调用 chunk() 方法。

但是这两种方案在处理大表的时候都不好,首先是 get(),会导致 PHP 占用内存过大,而 chunk() 方法实际上就是一个分页的封装,最终的查询语句是 LIMIT offset, count;

chunk() 也是个人之前一直使用的方法,但是在表越来越大之后,发现有比较严重的性能问题,越到后面的页查询就越慢。

为什么 MySQL 分页慢?

我们可以 explain 一下分页 sql,我们会发现扫描行数等于 limit offset, count 里面的 offset,这和 MySQL 的分页机制有关:

MySQL 在执行 limit offset, count 语句的时候,需要把第一条数据到 offset 的那一条数据扫描一遍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> EXPLAIN SELECT * FROM users ORDER BY id DESC LIMIT 10000, 20\G
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: users
partitions: NULL
type: index
possible_keys: NULL
key: PRIMARY
key_len: 4
ref: NULL
rows: 10020 # 扫描了 10020 行
filtered: 100.00
Extra: NULL
1 row in set, 1 warning (0.03 sec)

LIMIT 10000, 20 意味着 MySQL 已经读取了 10020 条数据,并且丢弃了前 10000 行,然后返回接下来的 20 行。

参考链接: Efficient Pagination Using MySQL

使用 chunk 有什么问题?

现在我们知道了,MySQL 在分页的时候是从第一条数到 offset 的,也就是说,我们的 offset 越往后,需要扫描的行就越多。

在我们需要遍历全表数据的这种场景下,MySQL 就需要不断地扫描之前扫描过的数据,这样会导致重复扫描非常多。

我们都知道扫描一遍只需要 O(n) 的时间,但是由于 MySQL 的这种机制加上 chunk,会直接导致时间复杂度增加为 O(n²) ,在我们数据量越多的时候,速度下降得就越快。

解决方法

1. 记录上一次的最大 id(推荐使用)

在 MySQL 中的 InnoDB 引擎,主键索引字段是一个聚簇索引,存在 B+ 树的叶子节点层,是有序的。

我们可以利用这个特点,将上一次的最大 id (主键)记录下来,假设是 lastId,然后下一次查询的时候,加上 where id > lastId,这个时候我们的 limit 语句也要改一下,改成 limit count,就可以了,因为我们告诉了 MySQL offset 是什么。这样 MySQL 就不用做一些重复的扫描操作了。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 我们需要按 id 顺序去遍历
$builder = app(\Modules\Product\Models\Sku::class)->orderBy('id');

$lastId = 0;
while (true) {
/** @var \Illuminate\Database\Eloquent\Collection $skus */
$skus = app(\Modules\Product\Models\Sku::class)
->where('id', '>', $lastId)
->orderBy('id')
->limit(100)
->get();

if ($skus->count() > 0) {
$lastId = $skus->max('id');
}

// 最后一页了
if ($skus->count() < 100) {
break;
}
}

测试结果:在 33w 表的情况下,chunk() 需要 390s,而按上述方法只需要 22s。

2. 利用 MYSQL_ATTR_USE_BUFFERED_QUERY

在 PDO 里面有一个常量 MYSQL_ATTR_USE_BUFFERED_QUERY,是用来告诉 MySQL 是否使用查询缓存的。

Laravel 里面提供了一个 cursor() 方法,但是实际查询的时候是先获取所有结果再往下处理的,并不是预期那样获取一条之后返回。可参考 Using Cursor on large number of results causing memory issues。这个方法想要做的事情的确和我们的想法契合,但是由于 PDO 的 MYSQL_ATTR_USE_BUFFERED_QUERY 默认值为 true。所以导致实际表现并不是我们想要的。

但是 Laravel 也提供了方法让我们去手动设置这个属性:

1
2
3
4
5
6
7
8
$builder = app(\Modules\Product\Models\Sku::class);
// 获取底层 pdo 对象,然后设置 \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY 为 false
$builder->getConnection()
->getPdo()
->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);

foreach ($builder->cursor() as $item) {
}

这种解决方法有一定的问题,可参考上面提到的那个 laravel 的 issue。

总结

Laravel 扫描全表的时候可以记录上次 get() 的最大 id,下次从这个 id 起扫描,又或者利用 pdo 的 MYSQL_ATTR_USE_BUFFERED_QUERY 属性来单条获取。

STL 中关联容器内部的元素是排序的。STL 中的许多算法也涉及排序、查找。这些容器和算法都需要对元素进行比较,有的比较是否相等,有的比较元素大小。

在 STL 中,默认情况下,比较大小是通过 < 运算符进行的,和 > 运算符无关。在 STL 中提到 "大"、"小" 的概念时,以下三个说法是等价的:

  • x 比 y 小

  • 表达式 x < y 为真

  • y 比 x 大

一定要注意,y 比 x 大意味着 x < y 为真,而不是 y > x 为真。y > x 的结果如何并不重要,甚至 y > x 是没定义的都没有关系。

在 STL 中,x 和 y 相等也往往不等价于 x == y 为真。对于在未排序的区间上进行的算法,如排序查找算法 find,查找过程中比较两个元素是否相等用的是 == 运算符;但是对于在排好序的区间上进行查找、合并等操作的算法(如折半查找算法 binary_search,关联容器自身的成员函数 find)来说,x 和 y 相等是与 x < y 和 y < x 同时为假 等价的,与 == 运算符无关。看上去 x < y 和 y < x 同时为假就应该和 x == y 为真等价,其实不然。例如下面的 class A:

1
2
3
4
5
6
class A
{
int v;
public:
bool operator<(const A& a) const { return false; }
};

可以看到,对任意两个类 A 的对象 x、y,x < y 和 y < x 都是为假的。也就是说,对 STL 的关联容器和许多算法来说,任意两个类 A 都是相等的,这与 == 运算符的行为无关。

综上所述,使用 STL 中的关联容器和许多算法时,往往需要对 < 运算符进行适当的重载,使得这些容器和算法可以用 < 运算符对所操作的元素进行比较。最好将 < 运算符重载为全局函数,因为在重载为成员函数时,在有些编译器上会出错(由其 STL 源代码的写法导致。)