为什么当我发送两个输入流时,Spark Streaming停止工作?

Why does Spark Streaming stop working when I send two input streams?(为什么当我发送两个输入流时,Spark Streaming停止工作?)

本文介绍了为什么当我发送两个输入流时,Spark Streaming停止工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个Spark流应用程序,其中我需要使用来自两个服务器的输入流,每个服务器每秒向Spark上下文发送一条JSON消息。

我的问题是,如果我只在一个流上执行操作,一切都运行得很好。但如果我有来自不同服务器的两个流,那么Spark在可以打印任何东西之前冻结,并且只有在两个服务器都发送了它们必须发送的所有JSON消息时(当它检测到socketTextStream没有接收数据时)才开始重新工作。

以下是我的代码:

    JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
            StorageLevels.MEMORY_AND_DISK_SER);

    JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);

    JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(String stream) throws Exception {


            Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);

            return streamPair;
        }
    });

    JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(String stream) throws Exception {


            Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);

            return streamPair;
        }
    });

dataStream2.print(); //for example

请注意,没有错误消息,Spark Simple在启动上下文后冻结,虽然我从端口收到JSON消息,但它没有显示任何内容。

非常感谢。

推荐答案

查看Spark Streaming documentation中的这些警告,并查看它们是否适用:

要记住的要点

  • 在本地运行Spark Streaming程序时,请勿使用local或local1为主URL。这两种情况都意味着只有一个线程将用于本地运行任务。如果您使用的是基于接收器的输入DStream(例如Sockets、Kafka、Flume等),则将使用单个线程来运行接收器,而不会留下处理接收到的数据的线程。因此,在本地运行时,请始终使用"local[n]"作为主URL,其中n>要运行的接收器的数量(有关如何设置主URL的信息,请参阅Spark属性)。
  • 将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须多于接收器数。否则,系统将接收数据,但无法处理数据。

这篇关于为什么当我发送两个输入流时,Spark Streaming停止工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:为什么当我发送两个输入流时,Spark Streaming停止工作?

基础教程推荐