Spark Streaming: How to get the filename of a processed file in Python(Spark Streaming:如何在Python中获取已处理文件的文件名)
问题描述
我是Spark的新手(老实说,我也是Python的新手),如果我错过了一些明显的东西,请原谅我。
我正在使用Spark和Python进行文件流传输。在我所做的第一个示例中,Spark正确地侦听给定的目录并计算文件中出现的单词,因此我知道一切都是在侦听该目录的情况下工作的。 现在,我正在尝试获取为进行审计而处理的文件的名称。我在这里读到 http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E 这不是一项微不足道的任务。我这里有一个可能的解决方案 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E 我试着按如下方式实现:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fileName(data):
string = data.toDebugString
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingFileNamePrinter")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("file:///test/input/")
files = lines.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
不幸的是,现在它不是每秒监听文件夹,而是监听一次,输出‘NONE’,然后等待什么也不做。这与运行正常的代码之间的唯一区别是
files = lines.foreachRDD(fileName)
在我担心获取文件名(明天的问题)之前,有人能理解为什么这只检查目录一次吗?
提前感谢 M
推荐答案
,因此这是一个新手错误。我将我的解决方案张贴出来,供我自己和其他人参考。
正如@user3689574所指出的,我没有在我的函数中返回调试字符串。这充分解释了为什么我得到的是‘无’。
接下来,我在函数外部打印调试,这意味着它从来不是ForeachRDD的一部分。将其移动到函数中,如下所示:
def fileName(data):
debug = data.toDebugString()
print(debug)
这会打印调试信息,并继续监听目录。改变这一点解决了我最初的问题。在获取文件名方面,这变得非常简单。
目录没有变化时的调试字符串如下:
(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []
这清楚地表明没有文件。将文件复制到目录中时,调试输出如下:
(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []
它通过一个快速正则表达式很容易地为您提供了文件名。希望这对其他人有帮助。
这篇关于Spark Streaming:如何在Python中获取已处理文件的文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Spark Streaming:如何在Python中获取已处理文件的文件名
基础教程推荐
- 哪些 Python 包提供独立的事件系统? 2022-01-01
- 如何在Python中绘制多元函数? 2022-01-01
- 使 Python 脚本在 Windows 上运行而不指定“.py";延期 2022-01-01
- 使用Python匹配Stata加权xtil命令的确定方法? 2022-01-01
- Python 的 List 是如何实现的? 2022-01-01
- 症状类型错误:无法确定关系的真值 2022-01-01
- 如何在 Python 中检测文件是否为二进制(非文本)文 2022-01-01
- 使用 Google App Engine (Python) 将文件上传到 Google Cloud Storage 2022-01-01
- 将 YAML 文件转换为 python dict 2022-01-01
- 合并具有多索引的两个数据帧 2022-01-01