Hazelcast jet Change data capture(Hazelcast喷气变化数据捕获)
问题描述
我正在我的应用程序中使用Hazelcast变更数据捕获(CDC)。
(我之所以使用CDC,是因为如果使用JDBC或其他替代功能将数据加载到缓存中,则需要花费大量时间)
因此CDC将在数据库和Hazelcast Jet之间进行数据同步。
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("tblName").build();
这里我有以下步骤:-
Pipeline pipeline = Pipeline.create();
// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
.filter(deletedFalse);
deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());
在这里,我使用StreamSource<ChangeRecord> source
对象作为pipeLine
的输入。
如您所知,source
对象是流类型。
但在我的例子中,管道数据处理取决于用户输入数据(一些元数据)。
如果我在数据库中进行任何更新或删除。Jet将更新所有流实例。
因为我的数据处理依赖于用户数据,所以我不想在第一步之后使用流类型。
只需要流形式的第一个StreamSource<ChangeRecord> source;
。
在下一步中,我只想对批处理流程执行此操作;
那么如何在批处理中使用source
。
pipeLine.readFrom(source)
//始终返回Stream类型。那么如何将其转换为批处理类型。
我又尝试了一种方法,如:-
从source
读取并将所有内容沉入地图。
pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));
再次构造管道ReadFrom from map。
pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());
这只是返回空数据。
所以任何建议都会很有帮助..
推荐答案
只有当您需要持续更新数据时,使用CDC源才有意义。例如,对数据库中的每一次更新做出反应,或可能将数据加载到映射中,然后在内存中的快照上以某个时间间隔重复运行批处理作业。
在这种情况下,您可能只希望第一次更新发生在CDC源是最新的之后--在它从数据库读取所有当前状态并且只接收对数据库进行的更新之后。遗憾的是,目前(Hazelcast 5.0)无法使用Jet API判断何时发生这种情况。
您可以使用一些特定于域的信息-具有您查询的时间戳字段、映射中存在上次插入的记录或类似信息。
如果要对数据库表中的数据运行单个批处理作业,则应使用JDBC源。
(我之所以使用CDC,是因为如果使用JDBC或其他替代功能将数据加载到缓存中,则需要花费大量时间)
使用CDC有它的开销,这是我们通常不会看到的。在JDBC源代码中使用像SELECT * FROM table
这样的普通SQL查询比使用CDC源代码更快。也许你没有衡量处理整个当前状态所需的时间?如果使用JDBC加载数据比使用CDC加载数据确实需要更多时间,请向复制者提交问题。
这篇关于Hazelcast喷气变化数据捕获的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Hazelcast喷气变化数据捕获
基础教程推荐
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 在螺旋中写一个字符串 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01