Apache Paimon 使用之 Mongo CDC 解析

news/2024/4/27 16:46:00
Mongo CDC

a.依赖准备

flink-sql-connector-mongodb-cdc-*.jar

仅支持cdc 2.4+

b.同步表

通过在Flink DataStream作业中使用 MongoDBSyncTableAction 或直接通过flink run,可以将MongoDB中的一个集合同步到一个Paimon表中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--partition_keys <partition_keys>] \[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
ConfigurationDescription
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–tableThe Paimon table name.
–partition_keysThe partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
–computed_columnThe definitions of computed columns. The argument field is from MongoDB collection field name. See here for a complete list of configurations.
–mongodb_confThe configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database and collection are required configurations, others are optional. See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

注意

  1. mongodb_conf在MongoDB CDC Source 中引入了schema.start.mode参数,提供两种模式:dynamic(默认)和specified

    dynamic模式下,MongoDB模式信息可以动态解析【只解析顶级字段】。

    specified模式下,需要配置field.name来指定同步字段和parser.path来指定这些字段的JSON解析路径来完成。

    两者之间的区别,specify模式要求明确识别要使用的字段,并根据这些字段创建映射表。另一方面,动态模式确保Paimon和MongoDB始终保持顶级字段的一致性,无需专注于特定字段。但是使用嵌套字段时,需要进一步处理数据表。

  2. mongodb_conf引入了default.id.generation参数,作为对MongoDB CDC Source配置的增强。

    default.id.generation提供了两种行为:设置为true时和设置为false时。

    default.id.generation设置为true时,MongoDB CDC Source遵循默认的_id生成策略,该策略涉及剥离外部$oid嵌套以提供更直接的标识符,此模式简化了_id表示,使其更加直接和对用户更友好。

    default.id.generation设置为false时,MongoDB CDC Source将保留原始的_id结构,无需任何额外的处理,此模式为用户提供了使用MongoDB提供的原始_id格式的灵活性,保留任何嵌套元素,如$oid

    两者之间的选择取决于用户的偏好:前者是更干净、简化的_id,后者是MongoDB的_id结构的直接表示。

OperatorDescription
$The root element to query. This starts all path expressions.
@The current node being processed by a filter predicate.
*Wildcard. Available anywhere a name or numeric are required.
Deep scan. Available anywhere a name is required.
.Dot-notated child.
[‘{name}’ (, ‘{name}’)]Bracket-notated child or children.
[{number} (, {number})]Bracket-notated child or children.
[start:end]Array index or indexes.
[?({expression})]Filter expression. Expression must evaluate to a boolean value.

函数可以在路径的尾端调用-函数的输入是路径表达式的输出,函数输出由函数本身决定。

FunctionDescriptionOutput type
min()Provides the min value of an array of numbers.Double
max()Provides the max value of an array of numbers.Double
avg()Provides the average value of an array of numbers.Double
stddev()Provides the standard deviation value of an array of numbersDouble
length()Provides the length of an arrayInteger
sum()Provides the sum value of an array of numbers.Double
keys()Provides the property keys (An alternative for terminal tilde ~)Set
concat(X)Provides a concatinated version of the path output with a new item.like input
append(X)add an item to the json path output arraylike input
append(X)add an item to the json path output arraylike input
first()Provides the first item of an arrayDepends on the array
last()Provides the last item of an arrayDepends on the array
index(X)Provides the item of an array of index: X, if the X is negative, take from backwardsDepends on the array

路径示例

{"store": {"book": [{"category": "reference","author": "Nigel Rees","title": "Sayings of the Century","price": 8.95},{"category": "fiction","author": "Evelyn Waugh","title": "Sword of Honour","price": 12.99},{"category": "fiction","author": "Herman Melville","title": "Moby Dick","isbn": "0-553-21311-3","price": 8.99},{"category": "fiction","author": "J. R. R. Tolkien","title": "The Lord of the Rings","isbn": "0-395-19395-8","price": 22.99}],"bicycle": {"color": "red","price": 19.95}},"expensive": 10
}
JsonPathResult
$.store.book[*].authorProvides the min value of an array of numbers.
$…authorAll authors.
$.store.*All things, both books and bicycles.
$.store…priceProvides the standard deviation value of an array of numbers.
$…book[2]The third book.
$…book[-2]The second to last book.
$…book[0,1]The first two books.
$…book[:2]All books from index 0 (inclusive) until index 2 (exclusive).
$…book[1:2]All books from index 1 (inclusive) until index 2 (exclusive)
$…book[-2:]Last two books
$…book[2:]All books from index 2 (inclusive) to last
$…book[?(@.isbn)]All books with an ISBN number
$.store.book[?(@.price < 10)]All books in store cheaper than 10
$…book[?(@.price <= $[‘expensive’])]All books in store that are not “expensive”
$…book[?(@.author =~ /.*REES/i)]All books matching regex (ignore case)
$…*Give me every thing
$…book.length()The number of books

同步表需要将其主键设置为_id,因为MongoDB的更改事件在消息更新之前被记录下来,因此,只能将它们转换为Flink的UPSERT更改日志流,Upsert流需要一个唯一的key,因此必须声明_id为主密钥,将其他列声明为主键是不可行的,因为删除操作仅包括_id和分片键,不包括其他键和值。

MongoDB Change Streams旨在返回简单的JSON文档,没有任何数据类型定义,这是因为MongoDB是一个面向文档的数据库,其核心功能之一是动态模式,其中文档可以包含不同的字段,并且字段的数据类型可以灵活,因此,Change Streams中缺乏数据类型定义是为了保持这种灵活性和可扩展性,出于这个原因,我们设置了所有字段数据类型,用于将MongoDB同步到Paimon作为字符串,以解决无法获取数据类型的问题。

如果指定的Paimon表不存在,此操作将自动创建该表,它的模式将从MongoDB集合中派生出来。

示例1:将集合同步到一个Paimon表中

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--computed_column '_year=year(age)' \--mongodb_conf hosts=127.0.0.1:27017 \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--mongodb_conf collection=source_table1 \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

示例2:根据指定的字段映射将集合同步到Paimon表中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_table \--warehouse hdfs:///path/to/warehouse \--database test_db \--table test_table \--partition_keys pt \--mongodb_conf hosts=127.0.0.1:27017 \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--mongodb_conf collection=source_table1 \--mongodb_conf schema.start.mode=specified \--mongodb_conf field.name=_id,name,description \--mongodb_conf parser.path=$._id,$.name,$.description \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

c.同步数据库

通过在Flink DataStream作业中或直接通过flink run使用MongoDBSyncDatabaseAction,可以将整个MongoDB数据库同步到一个Paimon数据库中。

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_database--warehouse <warehouse-path> \--database <database-name> \[--table_prefix <paimon-table-prefix>] \[--table_suffix <paimon-table-suffix>] \[--including_tables <mongodb-table-name|name-regular-expr>] \[--excluding_tables <mongodb-table-name|name-regular-expr>] \[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf <mongodb-cdc-source-conf> ...]] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
ConfigurationDescription
–warehouseThe path to Paimon warehouse.
–databaseThe database name in Paimon catalog.
–table_prefixThe prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have “ods_” as prefix, you can specify “–table_prefix ods_”.
–table_suffixThe suffix of all Paimon tables to be synchronized. The usage is same as “–table_prefix”.
–including_tablesIt is used to specify which source tables are to be synchronized. You must use ‘|’ to separate multiple tables.Because ‘|’ is a special character, a comma is required, for example: ‘a|b|c’.Regular expression is supported, for example, specifying “–including_tables test|paimon.*” means to synchronize table ‘test’ and all tables start with ‘paimon’.
–excluding_tablesIt is used to specify which source tables are not to be synchronized. The usage is same as “–including_tables”. “–excluding_tables” has higher priority than “–including_tables” if you specified both.
–mongodb_confThe configuration for Flink CDC MongoDB sources. Each configuration should be specified in the format “key=value”. hosts, username, password, database are required configurations, others are optional. See its document for a complete list of configurations.
–catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
–table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

所有要同步的集合都需要将_id设置为主键。

对于要同步的每个MongoDB集合,如果相应的Paimon表不存在,将自动创建表,其模式将从所有指定的MongoDB集合中派生出来。

如果Paimon表已经存在,其模式将与所有指定的MongoDB集合的模式进行比较,任务开始后创建的任何MongoDB表将自动包含。

示例1:同步整个数据库

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \mongodb_sync_database \--warehouse hdfs:///path/to/warehouse \--database test_db \--mongodb_conf hosts=127.0.0.1:27017 \--mongodb_conf username=root \--mongodb_conf password=123456 \--mongodb_conf database=source_db \--catalog_conf metastore=hive \--catalog_conf uri=thrift://hive-metastore:9083 \--table_conf bucket=4 \--table_conf changelog-producer=input \--table_conf sink.parallelism=4

示例2:同步指定的表

<FLINK_HOME>/bin/flink run \
--fromSavepoint savepointPath \
/path/to/paimon-flink-action-0.7.0-incubating.jar \
mongodb_sync_database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--mongodb_conf hosts=127.0.0.1:27017 \
--mongodb_conf username=root \
--mongodb_conf password=123456 \
--mongodb_conf database=source_db \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--including_tables 'product|user|address|order|custom'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.cpky.cn/p/10952.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

win10笔记本在显示设置中不慎将主显示器禁用掉导致开机黑屏的解决方案

因为笔记本电脑的显示扩展接口有问题&#xff0c;所以在电脑开机之后&#xff0c;会误识别出几个不存在的扩展屏幕&#xff0c;所以我就想从显示设置中将这几个误识别出来的扩展屏幕禁用掉&#xff08;不然鼠标总是移动到主屏幕边界之外的地方&#xff09;&#xff0c;在显示设…

【c++】c++背景(c++的前世今生)

主页&#xff1a;醋溜马桶圈-CSDN博客 专栏&#xff1a;c_醋溜马桶圈的博客-CSDN博客 gitee&#xff1a;mnxcc (mnxcc) - Gitee.com 目录 1. 什么是C 2. C发展史 3. C的重要性 3.1 语言的使用广泛度 3.2在工作邻域 1. 操作系统以及大型系统软件开发 2. 服务器端开发 3. …

网络安全笔记-day6,NTFS安全权限

文章目录 NTFS安全权限常用文件系统文件安全权限打开文件安全属性修改文件安全权限1.取消父项继承权限2.添加用户访问权限3.修改用户权限4.验证文件权限5.总结权限 强制继承父项权限文件复制移动权限影响跨分区同分区 总结1.权限累加2.管理员最高权限2.管理员最高权限 NTFS安全…

关机恶搞小程序

1. system("shutdown")的介绍 当system函数的参数是"shutdown"时&#xff0c;它将会执行系统的关机命令。 具体来说&#xff0c;system("shutdown")的功能是向操作系统发送一个关机信号&#xff0c;请求关闭计算机。这将触发操作系统执行一系列…

【滑动窗口、矩阵】算法例题

目录 三、滑动窗口 30. 长度最小的子数组 ② 31. 无重复字符的最长子串 ② 32. 串联所有单词的子串 ③ 33. 最小覆盖子串 ③ 四、矩阵 34. 有效的数独 ② 35. 螺旋矩阵 ② 36. 旋转图像 ② 37. 矩阵置零 ② 38. 生命游戏 ② 三、滑动窗口 30. 长度最小的子数组 ② 给…

<深度学习初识>——《深度学习笔记》

深度学习笔记 一、人工智能简介 1.人工智能发展的重要时间节点 1.1人工智能孕育期 1.2人工智能诞生期 1.3人工智能第一次浪潮的发展 1.4人工智能的第一次寒冬 第一次寒冬主要面临的几个问题&#xff1a; 1.5人工智能第二次浪潮的发展 1.6人工智能的第二次寒冬 1.7人工智能第…