且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

利用MongoDB的SplitVector命令实现并发数据迁移

更新时间:2021-12-05 12:37:21

背景

数据迁移是数据库运维中一个很常见的场景。数据迁移分为全量和增量。为了追求速度,通常我们会采用并发的方式对数据进行全量迁移。在全量导出数据时,通常都会选择做到记录级的并发,因此通常会涉及到对需要导出的某个表(集合)按照并发度进行切分(分区)的过程。现有常用做法是通过若干个skip加limit来找到一些分区点,然后就可以并发同时导出多个分区。事实上MongoDB还有一个SplitVector命令特别适合用来做集合的分区。本文将介绍一下如何利用这个命令来对集合做分区,实现并发数据迁移。

命令简介

SplitVector命令原是在sharding中chunk分裂时需要用的一个内部命令,是mongos在准备分裂某个chunk前发给这个chunk所在shard以计算分裂点(SplitPoint)时使用的。但是这个命令也可以用于普通的副本集,我们可以把副本集中的集合看作一个唯一的chunk,利用这个命令来为这个chunk计算分裂点,从而达到为某个集合进行分区的目的。

SplitVector命令的使用在官方文档中没有介绍,只说明了其实一个内部命令,但是使用命令的Help却可以看到:

db.runCommand({splitVector:"test.test", help:1})
{
 "help" : "help for: splitVector Internal command.\nexamples:\n  { splitVector : \"blog.post\" , keyPattern:{x:1} , min:{x:10} , max:{x:20}, maxChunkSize:200 }\n  maxChunkSize unit in MBs\n  May optionally specify 'maxSplitPoints' and 'maxChunkObjects' to avoid traversing the whole chunk\n  \n  { splitVector : \"blog.post\" , keyPattern:{x:1} , min:{x:10} , max:{x:20}, force: true }\n  'force' will produce one split point even if data is small; defaults to false\nNOTE: This command may take a while to run",
 "lockType" : 0,
 "ok" : 1
}

从帮助文档中可以大致看到,这个命令大致是这么使用的:

db.runCommand({splitVector:"blog.post", keyPattern:{x:1}, min{x:10}, max:{x:20}, maxChunkSize:200})

接下来介绍一下各个参数及其含义。

字段 类型 描述
splitVector string splitVector的操作对象集合名
keyPattern document chunk分裂使用的分区键,必须拥有索引,在sharding中就是shard key,在副本集中通常就指定成主键_id索引
min document 可选参数,分区数据集的最小值,如果没有指定,那么使用MinKey
max document 可选参数,分区数据集的最大值,如果没有指定,那么使用MaxKey
maxChunkSize integer 可选参数,和『force』参数二者必须指定一个。分区后每个chunk的最大大小
maxSplitPoints integer 可选参数,分裂点个数上限
maxChunkObjects integer 可选参数,分区后每个chunk最大包含的记录数,默认为250000
force boolean 可选参数,和『maxChunkSize』参数二者必须指定一个。默认情况下如果当前chunk的数据大小小于maxChunkSize则不会进行分裂。如果指定了『force』为true,那么会强制在当前chunk的中位点进行分裂,返回一个分裂点。默认为false。

这么多参数到底怎么用呢?我怎么知道出来的结果是怎样的?没有更详细的文档,只有啃源码了。​

原理

SplitVector的原理是遍历指定的『keyPattern』索引,根据指定的『maxChunkSize』找到满足以下条件的n个分裂点:分裂后的每个新的chunk的大小约为『maxChunkSize』的一半。如果集合当前大小比『maxChunkSize』小或者集合记录数为空,那么返回一个空的分裂点集合。如果指定了『force: true』,那么会忽略传入的『maxChunkSize』参数,并强制在集合的中位点进行分片,这时候只会产生一个分裂点。
在寻找分裂点时首先会根据集合的平均文档大小计算一个分裂后每个chunk所包含的文档数:

​​keyCount = maxChunkSize / (2 * avgObjSize)

​​如果指定了『maxChunkObjects』参数,并且『maxChunkObjects』比keyCount小,会使用『maxChunkObjects』作为keyCount。接下来就是遍历索引,每遍历keyCount个key,就得到一个分裂点(第keyCount+1个key),直到达到『maxSplitPoints』(若有指定)或遍历结束。因此其实每个partition会包含keyCount个key,最终得到的分裂点个数:

​​partitionCount = keyTotalCount / (keyCount + 1)
splitPointCount = partitionCount - 1

​​其中keyTotalCount为索引的key总数。

使用案例

知道了原理后,就知道如何去传参数了,如果要精确控制得到的分裂点个数(以便控制并发数),这里可以给出一个公式及推导过程。现在我们有以下公式:

​1. splitPointCount = partitionCount - 1
2. splitPointCount = keyTotalCount / (keyCount + 1)
3. keyCount = maxChunkSize / (2 * avgObjSize)

由上述公式可以推导出

maxChunkSize = (keyTotalCount / partitionCount - 1) * 2 * avgObjSize

由于所有集合都有_id字段上的唯一索引,并且每个文档都有_id字段,因此我们可以直接利用集合文档的个数docCount作为索引key的个数。文档个数和avgObjSize都可以通过collStats命令得到。注意参数中的『maxChunkSize』是以MB为单位的,最终传到命令的时候需要转换一下,并且在服务端中事实上会将『maxChunkSize』做个向下取整,因此最终计算出来的keyCount可能比我们设想的要小,这样就会导致最终得到的分裂点个数比我们想要的多。为了达到我们的需求,***加上『maxSplitPoints』这个可选参数对分裂点进行限制,这样我们允许最后一个分区比其他分区包含更多的文档数。

接下来举个具体的例子,假设现在需要将某个集合分成10个分区以支持10个并发同时对外导出数据,这个集合共有10240条文档,avgObjSize是1024,那么根据上述公式可以计算得到:

​maxChunkSize = (10240 / 10 - 1) * 2 * 1024 = 2MB

这样我们执行如下命令:

db.runCommand({splitVector:"test.test", keyPattern:{_id:1}, maxChunkSize:2, maxSplitPoints:9})

​这样就会只得到9个分裂点。