Configure RocksDB in flink 1.13(在Flink 1.13中配置RocksDB)
问题描述
我读过有关Flink 1.13版本中EmbeddedRocksDBStateBackend
的内容,但有大小限制,因此我希望保留以前Flink版本1.11的当前配置,但重点是这种配置RocksDB的方式已被弃用(new RocksDBStateBackend("path", true);
)。
我已使用EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true))
尝试使用新配置,但出现以下错误:
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
从Java以编程方式为Flink 1.13配置RocksDB状态后端的最佳方式是什么?
推荐答案
在Flink1.13中,我们重新组织了状态后端,因为旧的方法导致了对事物如何工作的许多误解。因此,这两个问题是分离的:
- 您的工作状态存储在哪里(状态后端)。(对于RocksDB,应将其配置为使用最快的可用本地磁盘。)
- 存储检查点的位置(检查点存储)。在大多数情况下,这应该是分布式文件系统。
RocksDBStateBackend
构造函数的方式,掩盖了在RocksDB的情况下涉及两个不同文件系统的事实。因此该配置位已移至其他位置(见下文)。
此表显示旧状态后端与新状态后端(与检查点存储结合使用)之间的关系:
传统状态后端 | 新状态后端+检查点存储 |
---|---|
MemoryStateBackend |
HashMapStateBackend + JobManagerCheckpointStorage |
FsStateBackend |
HashMapStateBackend + FileSystemCheckpointStorage |
RocksDBStateBackend |
EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage |
EmbeddedRocksDBStateBackend
与FileSystemCheckpointStorage
一起使用。您当前遇到的问题是,您正在对RocksDB使用内存中检查点存储(JobManagerCheckpointStorage
),这会严重限制可以设置检查点的状态数量。
您可以通过在flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
或在您的代码中
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
有关完整详细信息,请参阅Migrating from Legacy Backends上的文档。
这篇关于在Flink 1.13中配置RocksDB的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:在Flink 1.13中配置RocksDB
基础教程推荐
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 在螺旋中写一个字符串 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01