Is there a quot;Circuit Breakerquot; for Spring Boot Kafka client?(是否有适用于Spring Boot Kafka客户端的断路器?)
问题描述
如果Kafka服务器(暂时)关闭,我的Spring Boot应用程序ReactiveKafkaConsumerTemplate
一直尝试连接不成功,从而造成不必要的流量和日志文件混乱:
2021-11-10 14:45:30.265 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
是否可以使用断路器(灵感here或here)之类的东西,以便Spring BootKafka客户端在出现故障时(或者更好的是连续几次故障)放慢尝试连接的速度,只有在服务器重新启动后才会恢复到正常速度?
是否已有现成的配置参数或任何其他解决方案?
我知道parameterreconnect.backoff.ms
,这是我创建ReactiveKafkaConsumerTemplate
Bean的方式:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("com.example.myapplication");
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions
.<String, MyEvent>create(map)
.withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
.withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
.subscription(List.of("MyTopic")));
}
消费者仍尝试每3秒连接一次。
推荐答案
参见https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms
尝试重新连接到给定主机之前等待的基本时间。这避免了在紧密环路中重复连接到主机。此退避适用于客户端到代理的所有连接尝试。
和https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms
重新连接到多次连接失败的代理时等待的最长时间(以毫秒为单位)。如果提供,则每次连续连接失败时,每台主机的退避将呈指数级增加,最高可达此最大值。计算退避增加后,增加20%的随机抖动以避免连接风暴。
和
这篇关于是否有适用于Spring Boot Kafka客户端的断路器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:是否有适用于Spring Boot Kafka客户端的断路器?
基础教程推荐
- 在螺旋中写一个字符串 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01