How to configure Spring Boot Kafka client so it does not try to connect(如何配置Spring Boot Kafka客户端以使其不尝试连接)
问题描述
这与Is there a "Circuit Breaker" for Spring Boot Kafka client?有关,但我仍然认为这是一个不同的问题:)
我们需要配置Spring Boot Kafka客户端,以便它根本不会尝试连接。
用例是,在测试环境中,我们没有运行Kafka,但我们仍然需要构建完整的Spring Boot上下文,因此使该Bean以配置文件为条件是不起作用的。我们不在乎BEY是否未连接,但我们需要它存在。
问题是不成功的连接尝试需要大约30-40秒,我们的测试会显著减慢。
哪种configuration parameters或哪种组合完全禁止连接尝试,或至少强制客户端仅尝试一次?
多次重试连接的代码为:
@Bean
public KafkaAdmin.NewTopics topics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
它反复生成此警告:
WARN ... org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
以下代码仅尝试连接一次:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> myConsumer(KafkaProperties properties) {
return createConsumer(properties, "MyTopic", "MyConsumerGroup");
}
public <E> ReactiveKafkaConsumerTemplate<String, E> createConsumer(KafkaProperties properties, String topic, String consumerGroup) {
final Map<String, Object> map = configureKafkaProperties(properties, consumerGroup);
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions.<String, E>create(map)
.subscription(List.of(topic)));
}
制作
WARN 7268 ... org.apache.kafka.clients.NetworkClient : Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
我也尝试过设置属性
spring.kafka.admin.fail-fast=true
但这似乎完全没有效果。
推荐答案
Spring Boot自动配置默认情况下将连接到代理以创建任何NewTopic
Bean。您可以将其autoCreate
属性设置为False。
/**
* Set to false to suppress auto creation of topics during context initialization.
* @param autoCreate boolean flag to indicate creating topics or not during context initialization
* @see #initialize()
*/
public void setAutoCreate(boolean autoCreate) {
编辑
若要获取对KafkaAdmin
的引用,只需将其作为参数添加到任何Bean定义。
例如
@Bean
public KafkaAdmin.NewTopics topics(KafkaAdmin admin) {
admin.setAutoCreate(false);
return new KafkaAdmin.NewTopics(
TopicBuilder.name("MyTopic").build()
);
}
另请参阅KafkaAdmin.initialize()
。
/**
* Call this method to check/add topics; this might be needed if the broker was not
* available when the application context was initialized, and
* {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
* or {@link #setAutoCreate(boolean) autoCreate} was set to false.
* @return true if successful.
* @see #setFatalIfBrokerNotAvailable(boolean)
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
使用@KafkaListener
设置autoStartup = "false"
以防止使用者在上下文初始化时启动。
使用Reactive,只是不要订阅receive*()
方法返回的Flux
(这是触发消费者创建的内容)。
这篇关于如何配置Spring Boot Kafka客户端以使其不尝试连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何配置Spring Boot Kafka客户端以使其不尝试连接
基础教程推荐
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 在螺旋中写一个字符串 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01