Laravel Jenssegers MongoDB 批量更新实现

在数据迁移的时候,我们往往会有大量数据需要更新,如果使用 Eloquent 的 save 方法,那么每次更新都会执行一次 SQL 语句,这样会导致每一次更新都会有一次网络请求,数据量大的时候会非常慢。

MongoDB 本身提供了批量更新的方法,也就是使用 MongoDB\Driver\BulkWrite 这个类来实现批量更新,通过它我们就可以实现批量的更新操作了。

实现流程

以下是使用 MongoDB\Driver\BulkWrite 实现批量更新的流程:

1. 创建一个 BulkWrite 实例

1
$bulk = new MongoDB\Driver\BulkWrite(['ordered' => true]);

其中 ordered 参数表示是否按照插入的顺序执行操作,默认为 true。有序模式按操作顺序执行,失败则中断;无序模式允许并行执行,失败不影响后续操作。

若需严格保证插入顺序且不允许部分失败,使用 ordered: true;若需提高性能且允许部分失败,使用 ordered: false。

2. 添加更新操作

我们可以使用 BulkWriteupdate 方法来添加更新操作,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
foreach ($documents as $document) {
$bulk->update(
['_id' => new ObjectId($document['_id'])],
['$set' => $this->prepareDocument($document)]
);
}

private function prepareDocument(array $document): array
{
unset($document['_id']); // 移除主键
return $document;
}

其中,update 的第一个参数是查询条件,第二个参数是更新的数据。

在这个例子中,我们遍历了 $documents 数组,然后使用 update 方法来更新数据。

3. 执行批量更新

最后,我们可以使用 MongoDB\Driver\ManagerexecuteBulkWrite 方法来执行批量更新:

1
2
3
$manager      = new Manager('mongodb://localhost:27017');
$writeConcern = new WriteConcern(WriteConcern::MAJORITY, 30000);
$manager->executeBulkWrite('db0.user', $bulk, $writeConcern);

其中,executeBulkWrite 方法的第一个参数是数据库和集合名称,第二个参数是 BulkWrite 实例,第三个参数是 WriteConcern

Manager 是 MongoDB 的连接管理器,WriteConcern 是写入关注级别,MAJORITY 表示大多数节点写入成功即可,30000 表示超时时间(也就是 30s)。

完整实现

如果我们使用了 Laravel 的 jenssegers/mongodb 扩展,那么我们可以直接使用如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
<?php
use Jenssegers\Mongodb\Eloquent\Model;
use MongoDB\BSON\ObjectId;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Manager;
use MongoDB\Driver\WriteConcern;
use MongoDB\Driver\WriteResult;

class MongoBulkWriter
{
/**
* MongoDB 的模型
*
* @var Model
*/
private $model;

public function __construct(Model $model)
{
$this->model = $model;
}

/**
* mongo 批量更新
*
* @param Model $model MongoDB 的模型
* @param array $documents 更新的 mongodb 数据,每个文档必须包含 _id 字段,以及需要更新的字段
*
* @return WriteResult
*/
public static function write(Model $model, array $documents = []): WriteResult
{
$instance = new static($model);
return $instance->doWrite($documents);
}

/**
* mongo 批量更新
*
* @param array $documents 更新的mongodb数据
*
* @return WriteResult
*/
private function doWrite(array $documents = []): WriteResult
{
assert(!empty($documents), '更新数据不能为空');
assert(!empty($this->username()), 'MongoDB 用户名不能为空');

$bulk = new BulkWrite(['ordered' => true]);

foreach ($documents as $document) {
assert(isset($document['_id']), '数据主键不能为空');

$bulk->update(
['_id' => new ObjectId($document['_id'])],
['$set' => $this->prepareDocument($document)]
);
}

// 包含特殊字符的密码的时候,需要通过第二个参数传递用户名和密码
$manager = new Manager($this->connectionUri(), ['username' => $this->username(), 'password' => $this->password()]);
$writeConcern = new WriteConcern(WriteConcern::MAJORITY, 30000);
return $manager->executeBulkWrite($this->namespace(), $bulk, $writeConcern);
}

private function namespace(): string
{
$database = $this->model->getConnection()->getConfig()['database'];

return $database . '.' . $this->model->getTable();
}

private function connectionUri(): string
{
$connection = $this->model->getConnection();

$config = $connection->getConfig();

$mongoDbHost = $config['host'];
$mongoDbPort = $config['port'];

$database = $config['database'];

return 'mongodb://' . $mongoDbHost . ':' . $mongoDbPort . '/' . $database;
}

private function username()
{
return $this->model->getConnection()->getConfig()['username'];
}

private function password()
{
return $this->model->getConnection()->getConfig()['password'];
}

private function prepareDocument(array $document): array
{
unset($document['_id']); // 移除主键
return $document;
}
}

关键说明:我们可以直接通过 MongoDB 的模型来获取其关联的数据库配置信息,然后用这些信息来建立起 MongoDB 连接,最后通过这个连接的 executeBulkWrite 方法来执行批量更新。