Why does Spark Streaming stop working when I send two input streams?(为什么当我发送两个输入流时,Spark Streaming停止工作?)
问题描述
我正在开发一个Spark流应用程序,其中我需要使用来自两个服务器的输入流,每个服务器每秒向Spark上下文发送一条JSON消息。
我的问题是,如果我只在一个流上执行操作,一切都运行得很好。但如果我有来自不同服务器的两个流,那么Spark在可以打印任何东西之前冻结,并且只有在两个服务器都发送了它们必须发送的所有JSON消息时(当它检测到socketTextStream
没有接收数据时)才开始重新工作。
以下是我的代码:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
}
});
dataStream2.print(); //for example
请注意,没有错误消息,Spark Simple在启动上下文后冻结,虽然我从端口收到JSON消息,但它没有显示任何内容。
非常感谢。
推荐答案
查看Spark Streaming documentation中的这些警告,并查看它们是否适用:
要记住的要点
- 在本地运行Spark Streaming程序时,请勿使用local或local1为主URL。这两种情况都意味着只有一个线程将用于本地运行任务。如果您使用的是基于接收器的输入DStream(例如Sockets、Kafka、Flume等),则将使用单个线程来运行接收器,而不会留下处理接收到的数据的线程。因此,在本地运行时,请始终使用"local[n]"作为主URL,其中n>要运行的接收器的数量(有关如何设置主URL的信息,请参阅Spark属性)。
- 将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须多于接收器数。否则,系统将接收数据,但无法处理数据。
这篇关于为什么当我发送两个输入流时,Spark Streaming停止工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:为什么当我发送两个输入流时,Spark Streaming停止工作?
基础教程推荐
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 在螺旋中写一个字符串 2022-01-01