Apache Paimon 使用之 Mongo CDC 解析

news/2024/4/27 16:46:00
Mongo CDC



仅支持cdc 2.4+


通过在Flink DataStream作业中使用 MongoDBSyncTableAction 或直接通过flink run,可以将MongoDB中的一个集合同步到一个Paimon表中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--partition_keys <partition_keys>] \[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–tableThe Paimon table name.
–partition_keysThe partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
–computed_columnThe definitions of computed columns. The argument field is from MongoDB collection field name. See here for a complete list of configurations.
–mongodb_confThe configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database and collection are required configurations, others are optional. See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.


  1. mongodb_conf在MongoDB CDC Source 中引入了schema.start.mode参数,提供两种模式:dynamic(默认)和specified




  2. mongodb_conf引入了default.id.generation参数,作为对MongoDB CDC Source配置的增强。


    default.id.generation设置为true时,MongoDB CDC Source遵循默认的_id生成策略,该策略涉及剥离外部$oid嵌套以提供更直接的标识符,此模式简化了_id表示,使其更加直接和对用户更友好。

    default.id.generation设置为false时,MongoDB CDC Source将保留原始的_id结构,无需任何额外的处理,此模式为用户提供了使用MongoDB提供的原始_id格式的灵活性,保留任何嵌套元素,如$oid


$The root element to query. This starts all path expressions.
@The current node being processed by a filter predicate.
*Wildcard. Available anywhere a name or numeric are required.
Deep scan. Available anywhere a name is required.
.Dot-notated child.
[‘{name}’ (, ‘{name}’)]Bracket-notated child or children.
[{number} (, {number})]Bracket-notated child or children.
[start:end]Array index or indexes.
[?({expression})]Filter expression. Expression must evaluate to a boolean value.


FunctionDescriptionOutput type
min()Provides the min value of an array of numbers.Double
max()Provides the max value of an array of numbers.Double
avg()Provides the average value of an array of numbers.Double
stddev()Provides the standard deviation value of an array of numbersDouble
length()Provides the length of an arrayInteger
sum()Provides the sum value of an array of numbers.Double
keys()Provides the property keys (An alternative for terminal tilde ~)Set
concat(X)Provides a concatinated version of the path output with a new item.like input
append(X)add an item to the json path output arraylike input
append(X)add an item to the json path output arraylike input
first()Provides the first item of an arrayDepends on the array
last()Provides the last item of an arrayDepends on the array
index(X)Provides the item of an array of index: X, if the X is negative, take from backwardsDepends on the array


{"store": {"book": [{"category": "reference","author": "Nigel Rees","title": "Sayings of the Century","price": 8.95},{"category": "fiction","author": "Evelyn Waugh","title": "Sword of Honour","price": 12.99},{"category": "fiction","author": "Herman Melville","title": "Moby Dick","isbn": "0-553-21311-3","price": 8.99},{"category": "fiction","author": "J. R. R. Tolkien","title": "The Lord of the Rings","isbn": "0-395-19395-8","price": 22.99}],"bicycle": {"color": "red","price": 19.95}},"expensive": 10
$.store.book[*].authorProvides the min value of an array of numbers.
$…authorAll authors.
$.store.*All things, both books and bicycles.
$.store…priceProvides the standard deviation value of an array of numbers.
$…book[2]The third book.
$…book[-2]The second to last book.
$…book[0,1]The first two books.
$…book[:2]All books from index 0 (inclusive) until index 2 (exclusive).
$…book[1:2]All books from index 1 (inclusive) until index 2 (exclusive)
$…book[-2:]Last two books
$…book[2:]All books from index 2 (inclusive) to last
$…book[?(@.isbn)]All books with an ISBN number
$.store.book[?(@.price < 10)]All books in store cheaper than 10
$…book[?(@.price <= $[‘expensive’])]All books in store that are not “expensive”
$…book[?(@.author =~ /.*REES/i)]All books matching regex (ignore case)
$…*Give me every thing
$…book.length()The number of books


MongoDB Change Streams旨在返回简单的JSON文档,没有任何数据类型定义,这是因为MongoDB是一个面向文档的数据库,其核心功能之一是动态模式,其中文档可以包含不同的字段,并且字段的数据类型可以灵活,因此,Change Streams中缺乏数据类型定义是为了保持这种灵活性和可扩展性,出于这个原因,我们设置了所有字段数据类型,用于将MongoDB同步到Paimon作为字符串,以解决无法获取数据类型的问题。



<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--computed_column '_year=year(age)' \--mongodb_conf hosts= \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--mongodb_conf collection=source_table1 \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4


<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--mongodb_conf hosts= \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--mongodb_conf collection=source_table1 \--mongodb_conf schema.start.mode=specified \--mongodb_conf field.name=_id,name,description \--mongodb_conf parser.path=$._id,$.name,$.description \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4


通过在Flink DataStream作业中或直接通过flink run使用MongoDBSyncDatabaseAction,可以将整个MongoDB数据库同步到一个Paimon数据库中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_database--warehouse <warehouse-path> \--database <database-name> \[--table_prefix <paimon-table-prefix>] \[--table_suffix <paimon-table-suffix>] \[--including_tables <mongodb-table-name|name-regular-expr>] \[--excluding_tables <mongodb-table-name|name-regular-expr>] \[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–table_prefixThe prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have “ods_” as prefix, you can specify “–table_prefix ods_”.
–table_suffixThe suffix of all Paimon tables to be synchronized. The usage is same as “–table_prefix”.
–including_tablesIt is used to specify which source tables are to be synchronized. You must use ‘|’ to separate multiple tables.Because ‘|’ is a special character, a comma is required, for example: ‘a|b|c’.Regular expression is supported, for example, specifying “–including_tables test|paimon.*” means to synchronize table ‘test’ and all tables start with ‘paimon’.
–excluding_tablesIt is used to specify which source tables are not to be synchronized. The usage is same as “–including_tables”. “–excluding_tables” has higher priority than “–including_tables” if you specified both.
–mongodb_confThe configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database are required configurations, others are optional. See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.





<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_database \--warehouse hdfs:///path/to/warehouse \--database test_db \--mongodb_conf hosts= \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4


<FLINK_HOME>/bin/flink run \
--fromSavepoint savepointPath \
/path/to/paimon-flink-action-0.7.0-incubating.jar \
mongodb_sync_database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mongodb_conf hosts= \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--including_tables 'product|user|address|order|custom'







主页&#xff1a;醋溜马桶圈-CSDN博客 专栏&#xff1a;c_醋溜马桶圈的博客-CSDN博客 gitee&#xff1a;mnxcc (mnxcc) - Gitee.com 目录 1. 什么是C 2. C发展史 3. C的重要性 3.1 语言的使用广泛度 3.2在工作邻域 1. 操作系统以及大型系统软件开发 2. 服务器端开发 3. …


文章目录 NTFS安全权限常用文件系统文件安全权限打开文件安全属性修改文件安全权限1.取消父项继承权限2.添加用户访问权限3.修改用户权限4.验证文件权限5.总结权限 强制继承父项权限文件复制移动权限影响跨分区同分区 总结1.权限累加2.管理员最高权限2.管理员最高权限 NTFS安全…


1. system("shutdown")的介绍 当system函数的参数是"shutdown"时&#xff0c;它将会执行系统的关机命令。 具体来说&#xff0c;system("shutdown")的功能是向操作系统发送一个关机信号&#xff0c;请求关闭计算机。这将触发操作系统执行一系列…


目录 三、滑动窗口 30. 长度最小的子数组 ② 31. 无重复字符的最长子串 ② 32. 串联所有单词的子串 ③ 33. 最小覆盖子串 ③ 四、矩阵 34. 有效的数独 ② 35. 螺旋矩阵 ② 36. 旋转图像 ② 37. 矩阵置零 ② 38. 生命游戏 ② 三、滑动窗口 30. 长度最小的子数组 ② 给…


深度学习笔记 一、人工智能简介 1.人工智能发展的重要时间节点 1.1人工智能孕育期 1.2人工智能诞生期 1.3人工智能第一次浪潮的发展 1.4人工智能的第一次寒冬 第一次寒冬主要面临的几个问题&#xff1a; 1.5人工智能第二次浪潮的发展 1.6人工智能的第二次寒冬 1.7人工智能第…