在数据迁移的时候,我们往往会有大量数据需要更新,如果使用 Eloquent 的
save
方法,那么每次更新都会执行一次 SQL
语句,这样会导致每一次更新都会有一次网络请求,数据量大的时候会非常慢。
MongoDB 本身提供了批量更新的方法,也就是使用
MongoDB\Driver\BulkWrite
这个类来实现批量更新,通过它我们就可以实现批量的更新操作了。
实现流程
以下是使用 MongoDB\Driver\BulkWrite
实现批量更新的流程:
1. 创建一个 BulkWrite
实例
1
| $bulk = new MongoDB\Driver\BulkWrite(['ordered' => true]);
|
其中 ordered
参数表示是否按照插入的顺序执行操作,默认为
true
。有序模式按操作顺序执行,失败则中断;无序模式允许并行执行,失败不影响后续操作。
若需严格保证插入顺序且不允许部分失败,使用 ordered:
true;若需提高性能且允许部分失败,使用 ordered: false。
2. 添加更新操作
我们可以使用 BulkWrite
的 update
方法来添加更新操作,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12
| foreach ($documents as $document) { $bulk->update( ['_id' => new ObjectId($document['_id'])], ['$set' => $this->prepareDocument($document)] ); }
private function prepareDocument(array $document): array { unset($document['_id']); return $document; }
|
其中,update
的第一个参数是查询条件,第二个参数是更新的数据。
在这个例子中,我们遍历了 $documents
数组,然后使用
update
方法来更新数据。
3. 执行批量更新
最后,我们可以使用 MongoDB\Driver\Manager
的
executeBulkWrite
方法来执行批量更新:
1 2 3
| $manager = new Manager('mongodb://localhost:27017'); $writeConcern = new WriteConcern(WriteConcern::MAJORITY, 30000); $manager->executeBulkWrite('db0.user', $bulk, $writeConcern);
|
其中,executeBulkWrite
方法的第一个参数是数据库和集合名称,第二个参数是 BulkWrite
实例,第三个参数是 WriteConcern
。
Manager
是 MongoDB
的连接管理器,WriteConcern
是写入关注级别,MAJORITY
表示大多数节点写入成功即可,30000
表示超时时间(也就是
30s)。
完整实现
如果我们使用了 Laravel 的 jenssegers/mongodb
扩展,那么我们可以直接使用如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| <?php use Jenssegers\Mongodb\Eloquent\Model; use MongoDB\BSON\ObjectId; use MongoDB\Driver\BulkWrite; use MongoDB\Driver\Manager; use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteResult;
class MongoBulkWriter {
private $model;
public function __construct(Model $model) { $this->model = $model; }
public static function write(Model $model, array $documents = []): WriteResult { $instance = new static($model); return $instance->doWrite($documents); }
private function doWrite(array $documents = []): WriteResult { assert(!empty($documents), '更新数据不能为空'); assert(!empty($this->username()), 'MongoDB 用户名不能为空');
$bulk = new BulkWrite(['ordered' => true]);
foreach ($documents as $document) { assert(isset($document['_id']), '数据主键不能为空');
$bulk->update( ['_id' => new ObjectId($document['_id'])], ['$set' => $this->prepareDocument($document)] ); }
$manager = new Manager($this->connectionUri(), ['username' => $this->username(), 'password' => $this->password()]); $writeConcern = new WriteConcern(WriteConcern::MAJORITY, 30000); return $manager->executeBulkWrite($this->namespace(), $bulk, $writeConcern); }
private function namespace(): string { $database = $this->model->getConnection()->getConfig()['database'];
return $database . '.' . $this->model->getTable(); }
private function connectionUri(): string { $connection = $this->model->getConnection();
$config = $connection->getConfig();
$mongoDbHost = $config['host']; $mongoDbPort = $config['port'];
$database = $config['database'];
return 'mongodb://' . $mongoDbHost . ':' . $mongoDbPort . '/' . $database; }
private function username() { return $this->model->getConnection()->getConfig()['username']; }
private function password() { return $this->model->getConnection()->getConfig()['password']; }
private function prepareDocument(array $document): array { unset($document['_id']); return $document; } }
|
关键说明:我们可以直接通过 MongoDB
的模型来获取其关联的数据库配置信息,然后用这些信息来建立起 MongoDB
连接,最后通过这个连接的 executeBulkWrite
方法来执行批量更新。