通过MongoShake实现MongoDB实例的单向同步
发布日期:2025-01-03 17:23 点击次数:156
通过阿里云自研的MongoShake开源工具,您可以实现MongoDB数据库间的数据同步,该功能可用于数据分析、灾备和多活等业务场景。本文以云数据库MongoDB实例间的数据实时同步为例介绍配置流程。支持的数据源源数据库目标数据库ECS上的自建MongoDB数据库ECS上的自建MongoDB数据库本地自建的MongoDB数据库本地自建的MongoDB数据库阿里云MongoDB实例阿里云MongoDB实例第三方云MongoDB数据库第三方云MongoDB数据库注意事项在全量数据同步完成之前,请勿对源库进行DDL操作,否则可能导致数据不一致。不支持同步admin和local数据库。数据库用户的权限要求同步的数据源所需权限源MongoDB实例readAnyDatabase权限、local库的read权限和mongoshake库的readWrite权限。目标MongoDB实例readWriteAnyDatabase权限或目标库的readWrite权限。准备工作为达到最理想的同步性能,请确保源端MongoDB副本集实例的网络类型为专有网络VPC,如果是经典网络,请切换成专有网络VPC。更多信息,请参见经典网络切换为专有网络。创建作为同步目标端的MongoDB副本集实例,在创建的时候请选择与源端MongoDB副本集实例相同的专有网络VPC,以获取最低的网络延迟。更多信息,请参见创建副本集实例。创建用于运行MongoShake的ECS实例,在创建的时候请选择与源端MongoDB副本集实例相同的专有网络VPC,以获取最低的网络延迟。更多信息,请参见创建ECS实例。将ECS的内网IP地址加入至源端和目标端MongoDB实例的白名单中,并确保ECS可以连接源端和目标端MongoDB实例。 更多信息,请参见修改白名单。操作步骤本操作示例默认以/test/mongoshake目录作为MongoShake的安装目录。登录ECS服务器。执行如下命令下载MongoShake程序并重命名为mongoshake.tar.gz。 wget "-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/196977/jp_ja/1608863913991/mongo-shake-v2.4.16.tar.gz" -O mongoshake.tar.gz执行如下命令在将MongoShake解压到/test/mongoshake目录中。 tar zxvf mongoshake.tar.gz && mv mongo-shake-v2.4.16 /test/mongoshake && cd /test/mongoshake/mongo-shake-v2.4.16执行vi collector.conf命令,修改MongoShake的配置文件collector.conf,涉及的主要参数说明如下表所示。 参数说明示例值mongo_urls 源端MongoDB实例的ConnectionStringURI格式连接地址,数据库账号为test,所属数据库为admin。 mongo_urls = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717tunnel.address目标端MongoDB实例的ConnectionStringURI格式连接地址,数据库账号为test,所属数据库为admin。tunnel.address = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717sync_mode数据同步的方式,取值: all:执行全量数据同步和增量数据同步。full:仅执行全量数据同步。incr:仅执行增量数据同步。sync_mode = all执行下述命令启动同步任务,并打印日志信息。 ./collector.linux -conf=collector.conf -verbose观察打印的日志信息,当出现如下日志时,即代表全量数据同步已完成,并进入增量数据同步模式。 [09:38:57 CST 2019/06/20] [INFO] (mongoshake/collector.(*ReplicationCoordinator).Run:80) finish full sync, start incr sync with timestamp: fullBeginTs[1560994443], fullFinishTs[1560994737]监控MongoShake状态附件表 1. collector.conf全量参数说明参数目录参数说明示例值无conf.version当前配置文件的版本号,请不要修改该值。conf.version = 4全局配置选项id同步任务的ID,可自定义。主要用于本次任务的日志名称、断点续传(checkpoint)位点信息存储的数据库名称、同步到目的端的数据库名称等。id = mongoshakemaster_quorum高可用选项。当主备MongoShake同时从一个源端同步数据时,主MongoShake中需要设置该参数为true。 master_quorum = falsefull_sync.http_portHTTP端口,开放该端口可通过外网查看MongoShake的当前全量同步状态。full_sync.http_port = 9101incr_sync.http_portHTTP端口,开放该端口可通过外网查看MongoShake的当前增量同步状态。incr_sync.http_port = 9100system_profile_portProfiling端口,用于查看内部堆栈信息。system_profile_port = 9200log.level日志的等级,取值: error:包含错误级别信息的日志。warning:包含警告级别信息的日志。info:反馈当前系统状态的日志。debug:包含调试信息的日志。默认值:info。log.level = infolog.dir日志文件和pid文件的目录,不设置则默认使用当前路径的logs目录。log.dir = ./logs/log.file日志文件的名称,可自定义。 log.file = collector.loglog.flush日志在屏幕上的刷新频率。取值: true:打印每一条日志(对性能有影响)。false:不一定能打印日志,但是保证性能。log.flush = falsesync_mode数据同步的方式,取值: all:执行全量数据同步和增量数据同步。full:仅执行全量数据同步。incr:仅执行增量数据同步。sync_mode = allmongo_urls 源端MongoDB实例的ConnectionStringURI格式连接地址。示例中数据库账号为test,所属数据库为admin。 mongo_urls = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717mongo_cs_url如果源端的MongoDB类型为分片集群实例,需要输入ConfigServer(CS)节点的连接地址。如何申请ConfigServer节点的连接地址请参见申请Shard或ConfigServer节点连接地址。示例中数据库账号为test,所属数据库为admin。mongo_cs_url = mongodb://test:****@dds-bp19f409d7512****-csxxx.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****-csxxx.mongodb.rds.aliyuncs.com:3717/adminmongo_s_url如果源端的MongoDB类型为分片集群实例,需要输入至少一个Mongos节点的连接地址,多个Mongos地址之间以英文逗号(,)分隔。如何申请Mongos节点的连接地址请参见申请Shard或ConfigServer节点连接地址。示例中数据库账号为test,所属数据库为admin。mongos_s_url = mongodb://test:****@s-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,s-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717/admintunnel同步的通道类型。取值: direct:直接同步到目标MongoDB实例。rpc:通过NET/RPC方式同步。tcp:通过TCP方式同步。file:通过文件传输方式同步。kafka:通过Kafka方式同步。mock:仅用于测试,不写入通道。tunnel = directtunnel.address目标端的连接地址,支持如下地址: 当turnel参数为direct时,请输入目标端MongoDB实例的ConnectionStringURI格式连接地址。当turnel参数为rpc时,请输入目标端实例rpc的接收地址。当turnel参数为tcp时,请输入目标端实例的tcp接收地址。当turnel参数为file时,请输入目标端实例数据的文件路径。当turnel参数为kafka时,请输入kafka的地址,例如topic@brokers1,brokers2当turnel参数为mock时,此参数不填。示例中数据库账号为test,所属数据库为admin。tunnel.address = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717tunnel.message通道数据的类型,仅限tunnel参数为kafka或file时有效。取值: raw:默认的类型,采用聚合的模式进行写入和读取。json:以JSON格式写入kafka,便于用户直接读取。bson:以BSON二进制的格式写入kafka。tunnel.message = rawmongo_connect_modeMongoDB实例的连接模式,仅限tunnel参数为direct时有效。取值: primary:从primary节点中拉取数据。secondaryPreferred:从secondary节点中拉取数据。standalone:从指定的单个节点中拉取数据。mongo_connect_mode = secondaryPreferredfilter.namespace.black指定数据同步的黑名单,这些指定的命名空间不会被同步至目标数据库,多个命名空间用英文分号(;)分隔。 filter.namespace.black = mongodbtest.customer;testdata.test123filter.namespace.white指定数据同步的白名单,只有这些指定的命名空间会被同步至目标数据库,多个命名空间用英文分号(;)分隔。 filter.namespace.white = mongodbtest.customer;test123filter.pass.special.db启用特殊库的同步。正常同步过程中,admin、local、mongoshake、config、system.views等库会被系统过滤掉,您可以在有特殊需求时启用上述库的同步。多个库名用英文分号(;)分隔。filter.pass.special.db = admin;mongoshakefilter.ddl_enable是否开启DDL同步。取值: true:开启false:关闭filter.ddl_enable = falsecheckpoint.storage.url配置Checkpoint存储地址,用于支持断点续传。如不配置,程序将根据实例类型写入如下对应的数据库: MongoDB副本集实例:写入mongoshake库中。MongoDB分片集群实例:写入ConfigServer节点的admin库中。示例中数据库账号为test,所属数据库为admin。checkpoint.storage.url = mongodb://test:****@dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717,dds-bp19f409d7512****.mongodb.rds.aliyuncs.com:3717checkpoint.storage.dbCheckpoint存储的数据库名。 checkpoint.storage.db = mongoshakecheckpoint.storage.collectionCheckpoint存储的集合名。在开启主备MongoShake同时从一个源端同步数据时,可以修改该表名以防止表名重复引起冲突。 checkpoint.storage.collection = ckpt_defaultcheckpoint.start_position断点续传的开始位置,如果checkpoint位点已经存在则本参数无效。取值的格式:YYYY-MM-DDTHH:MM:SSZ。checkpoint.start_position = 1970-01-01T00:00:00Ztransform.namespace将源库的库名或集合名重新命名并同步到目的库。如:将源库中的A库.B集合重新命名成C库.D集合并同步到目的库。transform.namespace = fromA.fromB:toC.toD全量数据同步选项full_sync.reader.collection_parallel设置MongoShake单次最多并发拉取多少个集合。full_sync.reader.collection_parallel = 6full_sync.reader.write_document_parallel设置MongoShake对单个集合写入的并发线程数。full_sync.reader.write_document_parallel = 8full_sync.reader.document_batch_size设置MongoShake对目的端单次写入文档的聚合(batch)大小。如:128表示单次聚合128个文档后再写入。full_sync.reader.document_batch_size = 128full_sync.collection_exist_drop设置目的库存在同名集合时的处理方式。取值: true:先删除目标重名集合再同步。 false:如检测到目的库中有重名集合则直接报错退出。full_sync.collection_exist_drop = truefull_sync.create_index完成同步后是否创建索引。取值: foreground:创建前台索引background:创建后台索引none:不创建索引full_sync.create_index = nonefull_sync.executor.insert_on_dup_update目的库中有_id重复字段时是否将INSERT语句更改为UPDATE语句。取值: true:更改false:不更改full_sync.executor.insert_on_dup_update = falsefull_sync.executor.filter.orphan_document源端为分片集群实例时,是否过滤孤立文档(Orphaned document)。取值: true:过滤false:不过滤full_sync.executor.filter.orphan_document = falsefull_sync.executor.majority_enable是否启用目的端的多数写(Majority write)功能。取值: true:启用false:不启用full_sync.executor.majority_enable = false增量数据同步选项incr_sync.mongo_fetch_method配置增量数据拉取方法。取值: oplog:从源库中拉取oplog。change_stream:从源库中拉取change事件(仅支持MongoDB 4.0及以上版本)。默认值:oplogincr_sync.mongo_fetch_method = oplogincr_sync.oplog.gids用于云上集群搭建双向复制。incr_sync.oplog.gids = xxxxxxxxxxxxincr_sync.shard_keyMongoShake内部处理并发的方式。请勿修改该参数。incr_sync.shard_key = collectionincr_sync.worker传输oplog的并发线程数。如果主机性能足够,可以提高线程数。 incr_sync.worker = 8incr_sync.worker.oplog_compressor启用压缩数据功能以减少网络带宽消耗。取值: none:不压缩gzip:以gzip格式压缩zlib:以zlib格式压缩deflate:以deflate格式压缩incr_sync.worker.oplog_compressor = noneincr_sync.target_delay设置源端与目的端的延迟同步。通常源端中的变更会实时同步到目的端,为了防止误操作,您可以通过设置本参数达到延迟同步的目的。如:incr_sync.target_delay = 1800为设置30分钟的延迟。单位:秒。 incr_sync.target_delay = 1800incr_sync.worker.batch_queue_sizeMongoShake内部队列的配置参数。如无特殊情况请勿修改。incr_sync.worker.batch_queue_size = 64incr_sync.adaptive.batching_max_sizeincr_sync.adaptive.batching_max_size = 1024incr_sync.fetcher.buffer_capacityincr_sync.fetcher.buffer_capacity = 256MongoDB同步选项(仅适用于direct模式)incr_sync.executor.upsert当_id(重复字段)或唯一索引不存在时,是否将UPDATE语句更改为INSERT语句。取值: true:更改false:不更改incr_sync.executor.upsert = falseincr_sync.executor.insert_on_dup_update当_id(重复字段)或唯一索引不存在时,是否将INSERT语句更改为UPDATE语句。取值: true:更改false:不更改incr_sync.executor.insert_on_dup_update = falseincr_sync.conflict_write_to同步时如果存在写冲突,是否记录冲突文档。取值: none:不记录db:将冲突日志写入mongoshake_conflictsdk:将冲突日志写入sdkincr_sync.conflict_write_to = noneincr_sync.executor.majority_enable是否在目的端启用多数写(Majority write)。取值: true:启用false:不启用incr_sync.executor.majority_enable = false常见问题请参见MongoShake常见问题。