spring Kafka listening to regex(春天卡夫卡听着regex)
本文介绍了春天·卡夫卡听着regex的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用以下代码收听新创建的主题,但不起作用。您能告诉我下面的代码是否正确吗?
public class KafkaMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
private final ProcessEventModel eventModel;
@KafkaListener(topicPattern = "betsyncDataTopic*")
public void receive(ConsumerRecord<String, String> consumerRecord) {
LOGGER.info("received payload at '{}'", consumerRecord.timestamp());
eventModel.process(consumerRecord.value());
}
推荐答案
您的正则表达式无效;它应该是betsyncDataTopic.*
。
@KafkaListener(id = "xxx", topicPattern = "kbgh.*")
public void listen(String in) {
System.out.println(in);
}
...
partitions assigned: [kbgh290-0]
编辑
如果稍后添加与模式匹配的新主题,则在重新平衡之前会有延迟。根据KafkaConsumer
javadoc...
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topic existing at the time of check.
* <p>
* As part of group management, the consumer will keep track of the list of consumers that
* belong to a particular group and will trigger a rebalance operation if one of the
* following events trigger -
* <ul>
* <li>Number of partitions change for any of the subscribed list of topics
* <li>Topic is created or deleted
* <li>An existing member of the consumer group dies
* <li>A new member is added to an existing consumer group via the join API
* </ul>
我刚刚运行了一个测试;在12:13:32
处添加了一个新的匹配主题;结果:
2018-02-12 12:17:30.394 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions revoked: [kbgh290-0]
2018-02-12 12:17:30.450 INFO 88028 --- [ xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [kbgh290-0, kbghNew-0]
所以需要几分钟。
这篇关于春天·卡夫卡听着regex的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:春天·卡夫卡听着regex
基础教程推荐
猜你喜欢
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 在螺旋中写一个字符串 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01