Python Apache Beam Multiple Outputs amp; Processing(PYTHON阿帕奇光束多路输出和处理)
本文介绍了PYTHON阿帕奇光束多路输出和处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用以下流程在Google数据流上运行作业:
实质上是获取单个数据源,根据词典中的某些值进行筛选,并为每个筛选条件创建单独的输出。
我编写了以下代码:
# List of values to filter by
x_list = [1, 2, 3]
with beam.Pipeline(options=PipelineOptions().from_dictionary(pipeline_params)) as p:
# Read in newline JSON data - each line is a dictionary
log_data = (
p
| "Create " + input_file >> beam.io.textio.ReadFromText(input_file)
| "Load " + input_file >> beam.FlatMap(lambda x: json.loads(x))
)
# For each value in x_list, filter log_data for dictionaries containing the value & write out to separate file
for i in x_list:
# Return dictionary if given key = value in filter
filtered_log = log_data | "Filter_"+i >> beam.Filter(lambda x: x['key'] == i)
# Do additional processing
processed_log = process_pcoll(filtered_log, event)
# Write final file
output = (
processed_log
| 'Dump_json_'+filename >> beam.Map(json.dumps)
| "Save_"+filename >> beam.io.WriteToText(output_fp+filename,num_shards=0,shard_name_template="")
)
目前它只处理列表中的第一个值。我知道我可能必须使用Pardo,但我不太确定如何在我的流程中考虑这一点。
推荐答案
您可以在Beam中使用TaggedOutput。编写一个BEAM函数,它将标记PCollection中的每个元素。
import uuid
import apache_beam as beam
import dateutil.parser
from apache_beam.pvalue import TaggedOutput
class TagData(beam.DoFn):
def process(self, element):
key = element.get('key')
yield TaggedOutput(key, element)
processed_tagged_log = processed_log | "tagged-data-by-key " >> beam.ParDo(TagData()).with_outputs(*x_list)
现在您可以将此输出写入单独的文件/表
# Write files to separate tables/files
for key in x_list:
processed_tagged_log[key] | "save file %s" % uuid.uuid4()>> beam.io.WriteToText(output_fp+key+filename,num_shards=0,shard_name_template="")
来源: https://beam.apache.org/documentation/sdks/pydoc/2.0.0/_modules/apache_beam/pvalue.html
这篇关于PYTHON阿帕奇光束多路输出和处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:PYTHON阿帕奇光束多路输出和处理
基础教程推荐
猜你喜欢
- 如何在 Python 中检测文件是否为二进制(非文本)文 2022-01-01
- 使用 Google App Engine (Python) 将文件上传到 Google Cloud Storage 2022-01-01
- 使用Python匹配Stata加权xtil命令的确定方法? 2022-01-01
- 如何在Python中绘制多元函数? 2022-01-01
- 哪些 Python 包提供独立的事件系统? 2022-01-01
- 使 Python 脚本在 Windows 上运行而不指定“.py";延期 2022-01-01
- 将 YAML 文件转换为 python dict 2022-01-01
- 合并具有多索引的两个数据帧 2022-01-01
- 症状类型错误:无法确定关系的真值 2022-01-01
- Python 的 List 是如何实现的? 2022-01-01