How to change timestamp of records?(如何更改记录的时间戳?)
问题描述
我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。
是否有特定于Kafka Streams的解决方案?
我真正感兴趣的时间戳是由fluentd在消息中发送的:
";timestamp";:";1507885936";,&Quot;主机:&Quot;V.X.Y.Z.
以卡夫卡表示的记录:
偏移量=0,时间戳=-1,键=空,值={";timestamp";:";1507885936";,;主机;:V.X.Y.Z.&Quot;}
我希望有这样一张卡夫卡唱片:
OFFSET=0,TIMESTAMP=1507885936,KEY=NULL,VALUE={";timestamp";:";1507885936";,;HOST&QOT;:&QOT;V.X.Y.Z.&QOT;}
我的解决方法如下所示:
编写消费者提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
编写一个生产者,生成一个时间戳设置为(ProducerRecord(字符串主题,整数分区,长时间戳,K键,V值)的新记录)
我更喜欢KafkaStreams解决方案(如果有)。
推荐答案
您可以编写非常简单的Kafka Streams应用程序,如下所示:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
并使用从记录中提取时间戳并返回时间戳的自定义TimestampExtractor
配置应用程序。
Kafka Streams在将记录写回Kafka时将使用返回的时间戳。
注意:如果您有乱序数据--即时间戳没有严格排序--结果也将包含乱序时间戳。Kafka Streams使用返回的时间戳回写Kafka(即,无论提取程序返回什么,都用作记录元数据时间戳)。请注意,在写入时,当前处理的输入记录中的时间戳用于所有生成的输出记录--这适用于版本1.0,但在将来的版本中可能会更改。)。
更新:
一般来说,您可以通过处理器API修改时间戳。调用context.forward()
可以通过To.all().withTimestamp(...)
将输出记录时间戳设置为forward()
的参数。
这篇关于如何更改记录的时间戳?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何更改记录的时间戳?
基础教程推荐
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 在螺旋中写一个字符串 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01