Is there any way we can pause kafka stream for certain period and resume later?(我们有没有办法暂停卡夫卡流一段时间,然后再恢复?)
问题描述
我们有一个要求,我们使用Kafka Streams读取Kafka主题,然后通过一个会话池通过网络发送数据。然而,有时网络调用有点慢,我们需要频繁地暂停流,以确保我们没有使网络超载。目前,我们将数据捕获到流中,并将其加载到Executor服务,然后通过会话池通过网络发送。
如果Executor服务中的数据太高,我们需要暂停流一段时间,然后在Executor服务上的积压清理完毕后恢复它。为了实现此暂停机制,我们当前正在关闭流,并在清除积压后重新启动。
有什么方法可以暂停Kafka流吗?
推荐答案
如果我理解正确的话,您没有什么特别需要做的。你说的是"背压",而Kafka Streams可以开箱即用。
可以做的是将该数据放入某个最大大小的队列中,并使用该队列加载Executor服务。当队列达到某个阈值时,有两种方法:- 如果您将数据放入队列的调用在没有超时的情况下被阻塞,则无需再做任何操作。只要等系统恢复在线,你的电话 返回,处理将继续。
- 如果将数据放入队列的调用因超时而阻塞,只需执行查找以检查队列的大小。重复此操作,直到系统重新联机,您的呼叫成功。
另一种方法是使用Kafka-Streams中提供的处理器API,但这通常不是推荐的模式。
如果有帮助,请让我知道!!
这篇关于我们有没有办法暂停卡夫卡流一段时间,然后再恢复?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:我们有没有办法暂停卡夫卡流一段时间,然后再恢复?
基础教程推荐
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 在螺旋中写一个字符串 2022-01-01