How to avoid losing messages with Kafka streams(如何避免使用Kafka流丢失消息)
问题描述
我们有一个Streams应用程序,它使用源主题中的消息,执行一些处理并将结果转发到目标主题。
消息的结构由某些Avro架构控制。
当开始使用消息时,如果架构尚未缓存,应用程序将尝试从架构注册表中检索它。如果由于任何原因架构注册表不可用(例如网络故障),则当前正在处理的消息将丢失,因为默认处理程序是名为LogAndContinueExceptionHandler
的处理程序。
o.a.k.s.e.LogAndContinueExceptionHandler : Exception caught during Deserialization, taskId: 1_5, topic: my.topic.v1, partition: 5, offset: 142768
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 62
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
...
o.a.k.s.p.internals.RecordDeserializer : stream-thread [my-app-StreamThread-3] task [1_5] Skipping record due to deserialization error. topic=[my.topic.v1] partition=[5] offset=[142768]
...
因此,我的问题是,处理上述情况的正确方法是什么,并确保无论如何都不会丢失消息。是否有现成的LogAndRollbackExceptionHandler
错误处理程序或实现您自己的方法?
提前感谢您的投入。
推荐答案
我在Kafka上的工作不是很多,但当我工作时,我记得遇到了您在我们系统中描述的问题。
让我告诉你我们是如何处理我们的场景的,也许这也会对你有所帮助:
场景1:如果您的消息在发布方(Publisher--&>Kafka)丢失,您可以根据需要配置Kafka确认设置,如果您使用的是SpringCloud Stream和Kafka,则该属性为spring.cloud.stream.kafka.binder.required-acks
可能的值:
最多一次(Ack=0)
- 出版商不在乎卡夫卡是否承认。
- 发送并忘记
- 可能会丢失数据
至少一次(Ack=1)
如果Kafka不确认,出版商将重新发送消息。
可能存在重复。
在将邮件复制到副本之前发送确认。
正好一次(Ack=All)
如果Kafka不确认,出版商将重新发送消息。
但是,如果一条消息多次发送到Kafka,则不会有重复。
内部序列号,用于判断消息是否已经写入主题。
需要设置Min.insync.Replicas属性,以确保在Kafka向生产者确认之前,需要同步的最小复制数是多少。
场景2:如果您的数据在消费者端丢失(Kafka--&>Consumer),您可以根据您的使用情况更改Kafka的自动提交功能。如果您使用的是Spring Cloud Streamspring.cloud.stream.kafka.bindings.input.consumer.AutoCommitOffset.
,则为该属性默认情况下,在Kafka中AutoCommittee Offset为True,并且发送给消费者的每一条消息在Kafka的末尾都是&Quot;Submitted&Quot;,这意味着它不会被再次发送。但是,如果您将AutoCommittee Offset更改为False,您将有权在代码中轮询来自Kafka的消息,并且一旦您完成工作,将Commit显式设置为True,以让Kafka知道您现在已经完成了消息。
如果消息未提交,Kafka将继续重新发送,直到消息提交为止。
希望这对您有所帮助,或至少为您指明正确的方向。
这篇关于如何避免使用Kafka流丢失消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何避免使用Kafka流丢失消息
基础教程推荐
- 如何对 HashSet 进行排序? 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 在螺旋中写一个字符串 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01