Mongo CDC
a.依赖准备
flink-sql-connector-mongodb-cdc-*.jar
仅支持cdc 2.4+
b.同步表
通过在Flink DataStream作业中使用 MongoDBSyncTableAction 或直接通过flink run
,可以将MongoDB中的一个集合同步到一个Paimon表中。
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--partition_keys <partition_keys>] \[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Configuration | Description |
---|---|
–warehouse | The path to Paimon warehouse. |
–database | The database name in Paimon catalog. |
–table | The Paimon table name. |
–partition_keys | The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”. |
–computed_column | The definitions of computed columns. The argument field is from MongoDB collection field name. See here for a complete list of configurations. |
–mongodb_conf | The configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database and collection are required configurations, others are optional. See its document for a complete list of configurations. |
–catalog_conf | The configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations. |
–table_conf | The configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations. |
注意:
-
mongodb_conf
在MongoDB CDC Source 中引入了schema.start.mode
参数,提供两种模式:dynamic
(默认)和specified
。在
dynamic
模式下,MongoDB模式信息可以动态解析【只解析顶级字段】。在
specified
模式下,需要配置field.name
来指定同步字段和parser.path
来指定这些字段的JSON解析路径来完成。两者之间的区别,
specify
模式要求明确识别要使用的字段,并根据这些字段创建映射表。另一方面,动态模式确保Paimon和MongoDB始终保持顶级字段的一致性,无需专注于特定字段。但是使用嵌套字段时,需要进一步处理数据表。 -
mongodb_conf
引入了default.id.generation
参数,作为对MongoDB CDC Source配置的增强。default.id.generation
提供了两种行为:设置为true时和设置为false时。当
default.id.generation
设置为true时,MongoDB CDC Source遵循默认的_id
生成策略,该策略涉及剥离外部$oid嵌套以提供更直接的标识符,此模式简化了_id
表示,使其更加直接和对用户更友好。当
default.id.generation
设置为false时,MongoDB CDC Source将保留原始的_id
结构,无需任何额外的处理,此模式为用户提供了使用MongoDB提供的原始_id
格式的灵活性,保留任何嵌套元素,如$oid
。两者之间的选择取决于用户的偏好:前者是更干净、简化的
_id
,后者是MongoDB的_id
结构的直接表示。
Operator | Description |
---|---|
$ | The root element to query. This starts all path expressions. |
@ | The current node being processed by a filter predicate. |
* | Wildcard. Available anywhere a name or numeric are required. |
… | Deep scan. Available anywhere a name is required. |
. | Dot-notated child. |
[‘{name}’ (, ‘{name}’)] | Bracket-notated child or children. |
[{number} (, {number})] | Bracket-notated child or children. |
[start:end] | Array index or indexes. |
[?({expression})] | Filter expression. Expression must evaluate to a boolean value. |
函数可以在路径的尾端调用-函数的输入是路径表达式的输出,函数输出由函数本身决定。
Function | Description | Output type |
---|---|---|
min() | Provides the min value of an array of numbers. | Double |
max() | Provides the max value of an array of numbers. | Double |
avg() | Provides the average value of an array of numbers. | Double |
stddev() | Provides the standard deviation value of an array of numbers | Double |
length() | Provides the length of an array | Integer |
sum() | Provides the sum value of an array of numbers. | Double |
keys() | Provides the property keys (An alternative for terminal tilde ~) | Set |
concat(X) | Provides a concatinated version of the path output with a new item. | like input |
append(X) | add an item to the json path output array | like input |
append(X) | add an item to the json path output array | like input |
first() | Provides the first item of an array | Depends on the array |
last() | Provides the last item of an array | Depends on the array |
index(X) | Provides the item of an array of index: X, if the X is negative, take from backwards | Depends on the array |
路径示例:
{"store": {"book": [{"category": "reference","author": "Nigel Rees","title": "Sayings of the Century","price": 8.95},{"category": "fiction","author": "Evelyn Waugh","title": "Sword of Honour","price": 12.99},{"category": "fiction","author": "Herman Melville","title": "Moby Dick","isbn": "0-553-21311-3","price": 8.99},{"category": "fiction","author": "J. R. R. Tolkien","title": "The Lord of the Rings","isbn": "0-395-19395-8","price": 22.99}],"bicycle": {"color": "red","price": 19.95}},"expensive": 10
}
JsonPath | Result |
---|---|
$.store.book[*].author | Provides the min value of an array of numbers. |
$…author | All authors. |
$.store.* | All things, both books and bicycles. |
$.store…price | Provides the standard deviation value of an array of numbers. |
$…book[2] | The third book. |
$…book[-2] | The second to last book. |
$…book[0,1] | The first two books. |
$…book[:2] | All books from index 0 (inclusive) until index 2 (exclusive). |
$…book[1:2] | All books from index 1 (inclusive) until index 2 (exclusive) |
$…book[-2:] | Last two books |
$…book[2:] | All books from index 2 (inclusive) to last |
$…book[?(@.isbn)] | All books with an ISBN number |
$.store.book[?(@.price < 10)] | All books in store cheaper than 10 |
$…book[?(@.price <= $[‘expensive’])] | All books in store that are not “expensive” |
$…book[?(@.author =~ /.*REES/i)] | All books matching regex (ignore case) |
$…* | Give me every thing |
$…book.length() | The number of books |
同步表需要将其主键设置为_id
,因为MongoDB的更改事件在消息更新之前被记录下来,因此,只能将它们转换为Flink的UPSERT更改日志流,Upsert流需要一个唯一的key,因此必须声明_id
为主密钥,将其他列声明为主键是不可行的,因为删除操作仅包括_id和分片键,不包括其他键和值。
MongoDB Change Streams旨在返回简单的JSON文档,没有任何数据类型定义,这是因为MongoDB是一个面向文档的数据库,其核心功能之一是动态模式,其中文档可以包含不同的字段,并且字段的数据类型可以灵活,因此,Change Streams中缺乏数据类型定义是为了保持这种灵活性和可扩展性,出于这个原因,我们设置了所有字段数据类型,用于将MongoDB同步到Paimon作为字符串,以解决无法获取数据类型的问题。
如果指定的Paimon表不存在,此操作将自动创建该表,它的模式将从MongoDB集合中派生出来。
示例1:将集合同步到一个Paimon表中
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--computed_column '_year=year(age)' \--mongodb_conf hosts=127.0.0.1:27017 \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--mongodb_conf collection=source_table1 \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4
示例2:根据指定的字段映射将集合同步到Paimon表中。
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--mongodb_conf hosts=127.0.0.1:27017 \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--mongodb_conf collection=source_table1 \--mongodb_conf schema.start.mode=specified \--mongodb_conf field.name=_id,name,description \--mongodb_conf parser.path=$._id,$.name,$.description \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4
c.同步数据库
通过在Flink DataStream作业中或直接通过flink run
使用MongoDBSyncDatabaseAction,可以将整个MongoDB数据库同步到一个Paimon数据库中。
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_database--warehouse <warehouse-path> \--database <database-name> \[--table_prefix <paimon-table-prefix>] \[--table_suffix <paimon-table-suffix>] \[--including_tables <mongodb-table-name|name-regular-expr>] \[--excluding_tables <mongodb-table-name|name-regular-expr>] \[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Configuration | Description |
---|---|
–warehouse | The path to Paimon warehouse. |
–database | The database name in Paimon catalog. |
–table_prefix | The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have “ods_” as prefix, you can specify “–table_prefix ods_”. |
–table_suffix | The suffix of all Paimon tables to be synchronized. The usage is same as “–table_prefix”. |
–including_tables | It is used to specify which source tables are to be synchronized. You must use ‘|’ to separate multiple tables.Because ‘|’ is a special character, a comma is required, for example: ‘a|b|c’.Regular expression is supported, for example, specifying “–including_tables test|paimon.*” means to synchronize table ‘test’ and all tables start with ‘paimon’. |
–excluding_tables | It is used to specify which source tables are not to be synchronized. The usage is same as “–including_tables”. “–excluding_tables” has higher priority than “–including_tables” if you specified both. |
–mongodb_conf | The configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database are required configurations, others are optional. See its document for a complete list of configurations. |
–catalog_conf | The configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations. |
–table_conf | The configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations. |
所有要同步的集合都需要将_id设置为主键。
对于要同步的每个MongoDB集合,如果相应的Paimon表不存在,将自动创建表,其模式将从所有指定的MongoDB集合中派生出来。
如果Paimon表已经存在,其模式将与所有指定的MongoDB集合的模式进行比较,任务开始后创建的任何MongoDB表将自动包含。
示例1:同步整个数据库
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_database \--warehouse hdfs:///path/to/warehouse \--database test_db \--mongodb_conf hosts=127.0.0.1:27017 \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4
示例2:同步指定的表
<FLINK_HOME>/bin/flink run \
--fromSavepoint savepointPath \
/path/to/paimon-flink-action-0.7.0-incubating.jar \
mongodb_sync_database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--including_tables 'product|user|address|order|custom'