Why Spark writes Null in DeltaLake Table(电光为什么在三角洲湖表中写空)
问题描述
我的Java应用程序使用连接到套接字服务器的Spark Structured Streaming
不断获取封装在RDMessage对象中的传感器测量记录(IoT),该对象记录协议中用于控制的消息类型。
当消息到达时,将使用Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class)
检查它们并将其转换为数据集。
虽然流被正确读取并且RDMeasurement
对象被正确创建,但是输出流被设置为无或零,具体取决于数据类型。当我更改格式(.format("console")
)时,我在DeltaFrame表或控制台中看到这一点。
我这里错过了什么?出什么问题了?
请参阅下面最重要的Java代码段
public final class SocketRDMeasurement {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("SSSocketRDMeasurement")
.master("local[*]")
.getOrCreate();
Encoder<StringArray> stringArrayEncoder = Encoders.bean(StringArray.class);
Encoder<RDMessage> messageEncoder = Encoders.bean(RDMessage.class);
Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class);
Dataset<Row> records = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.load();
Dataset<String> inputReceived = records.as(Encoders.STRING());
Dataset<StringArray> input = inputReceived.as(Encoders.STRING())
.map((MapFunction<String, StringArray>) x ->
new StringArray(x),
stringArrayEncoder);
Dataset<RDMessage> messages = input.map(
(MapFunction<StringArray, RDMessage>)
r -> new RDMessage(r), messageEncoder);
Dataset<RDMeasurement> measurements = messages
.map((MapFunction<RDMessage, RDMeasurement>) r ->
new RDMeasurement(), measurementEncoder);
// The code executes without warning or error but despite the
// objects being created correctly the output of dataset is
// is saved with nulls/nan
StreamingQuery query = measurements.writeStream()
.outputMode("append")
.format("delta")
.option("checkpointLocation",
"/opt/data/delta/_checkpoints/ss-socket-rd-measurement")
.start("/opt/data/delta/ss-socket-rd-measurement");
query.awaitTermination();
}
}
public class StringArray implements Serializable {
private String[] tokens;
public StringArray(String tokens) {
this.tokens = tokens.split(",");
}
// getters, setters and toString goes here
}
public class RDMeasurement implements Serializable {
private String dataSourceName = null;
private double dt = 0.0f;
private double t0 = 0f;
private double endTimestamp = 0L;
private double[] valuesArray;
public RDMeasurement() { }
public RDMeasurement(String dataSourceName, double t0,
double dt, double endTimestamp, double[] valuesArray) {
this.dataSourceName = dataSourceName;
this.t0 = t0;
this.dt = dt;
this.endTimestamp = endTimestamp;
this.valuesArray = valuesArray;
}
// getters, setters and toString goes here
}
public class RDMessage implements Serializable {
String type;
RDMeasurement rdMeasurement;
public RDMessage(String type, RDMeasurement rdMeasurement) {
this.type = type;
this.rdMeasurement = rdMeasurement;
}
public RDMessage(StringArray stringArray) {
this(stringArray.getTokens()[0] ,
new RDMeasurement(stringArray.getTokens()[1],
Double.parseDouble(stringArray.getTokens()[2]),
Double.parseDouble(stringArray.getTokens()[3]),
Double.parseDouble(stringArray.getTokens()[4]),
toDoubleArray(5, stringArray))
);
}
private static double[] toDoubleArray(int skip, StringArray stringArray) {
double[] ret = new double[stringArray.getTokens().length - 5];
for (int i = 0; i < stringArray.getTokens().length - 5; i++) {
ret[i] = Double.parseDouble(stringArray.getTokens()[i+skip]);
}
return ret;
}
// getters, setters and toString goes here
}
每行输入遵循以下格式:
V1_start_rd_0,ds_1,1642442598.266,1.0,1642442618.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_1,ds_2,1642442619.266,1.0,1642442639.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_2,ds_3,1642442640.266,1.0,1642442660.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
推荐答案
重构Java代码并添加调试代码段后,我能够识别错误。
参见重构:
StreamingQuery query = dataStreamReader.load()
.as(Encoders.STRING())
.map((MapFunction<String, StringArray>) x -> new StringArray(x),
stringArrayEncoder)
.map((MapFunction<StringArray, RDMessage>)
r -> new RDMessage(r), messageEncoder)
.map((MapFunction<RDMessage, RDMeasurement>) e ->
e.getRdMeasurement(), measurementEncoder)
/*
.map((MapFunction<RDMeasurement, String>) e -> {
if (e.getDataSourceName() != null) {
System.out.println("•••> " + e);
}
return e.toString();
}, Encoders.STRING())
.map((MapFunction<String, RDMeasurement>) s -> new RDMeasurement(s),
measurementEncoder)
*/
.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
上面注释的代码使我能够识别问题。
这篇关于电光为什么在三角洲湖表中写空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:电光为什么在三角洲湖表中写空


基础教程推荐
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 在螺旋中写一个字符串 2022-01-01