Parallel pipeline inside one Dataflow Job(一个数据流作业内的并行管道)
本文介绍了一个数据流作业内的并行管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我要在GCP上的一个数据流作业内运行两个并行管道。我已经创建了一个管道,并且工作正常,但我希望在不创建另一个作业的情况下创建另一个管道。我搜索了这么多答案,但没有找到任何代码示例:(
如果我这样运行它,它不工作:
pipe1.run();
pipe2.run();
显示"已有活动作业名称...如果要提交第二个作业,请尝试使用--jobName
重新设置其他名称"
推荐答案
您可以将其他输入应用于管道,这将导致一个作业中的单独管道。例如:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline one:" + c.element());
c.output(c.element() + " extra message.");
}
}));
linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));
PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
linesForPipelineTwo.apply("Pipeline 2 transoform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline two:" + c.element());
}
}));
pipeline.run();
}
如您所见,您还可以将两个(或更多)独立的PBegin应用于具有多个PDone/接收器的管道。在此示例中,"pipeline 1"
将输出转储并写入文件,"pipeline 2"
仅将其转储到屏幕。
DataflowRunner
运行此命令,则图形用户界面将显示2个未连接的"管道"。
这篇关于一个数据流作业内的并行管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:一个数据流作业内的并行管道
基础教程推荐
猜你喜欢
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 在螺旋中写一个字符串 2022-01-01