Kafka Streams: How to get the first and the last record of a SessionWindow?(Kafka Streams:如何获得SessionWindow的第一个和最后一个记录?)
问题描述
默认情况下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60))
为每个传入记录返回一条记录。
结合使用.count()
和.filter()
可以轻松检索第一条记录。
使用
.suppress(Suppressed.untilWindowCloses(unbounded()))
还可以轻松检索最后一条记录。
所以…我做了两次处理,您可以看到修改后的字数统计示例:
final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v == null ? -1l : v))
.filter((wk, v) -> v == 1)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v))
.filter((wk, v) -> v != null)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
但我想知道是否有更简单、更漂亮的方法来做同样的事情。
推荐答案
我认为您应该使用SessionWindowedKStream::aggregate(...)
,并根据您的逻辑将结果累加到聚合器(第一个和最后一个值)
示例代码可能如下所示:
streamsBranches.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.aggregate(
AggClass::new,
(key, value, oldAgg) -> oldAgg.update(value),
(key, agg1, agg2) -> agg1.merge(agg2),
Materialized.with(Serdes.String(), new AggClassSerdes())
).suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream().map((wk, v) -> new KeyValue<>(wk.key(), v))
.to("streams-wordcount-output", Produced.with(Serdes.String(), new AggClassSerdes()));
其中AggClass
是累加器,AggClassSerdes
是累加器Serdes
public class AggClass {
private String first;
private String last;
public AggClass() {}
public AggClass(String first, String last) {
this.first = first;
this.last = last;
}
public AggClass update(String value) {
if (first == null)
first = value;
last = value;
return this;
}
public AggClass merge(AggClass other) {
if (this.first == null)
return other;
else return new AggClass(this.first, other.last);
}
}
这篇关于Kafka Streams:如何获得SessionWindow的第一个和最后一个记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Kafka Streams:如何获得SessionWindow的第一个和最后一个记录?
基础教程推荐
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 在螺旋中写一个字符串 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01