我正在使用Cassandra 3.0.3,Spark 1.6.0并试图通过组合http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java中的旧文档和https://github.com/datastax/spark-cassandra-connector/blob/master/...

我正在使用Cassandra 3.0.3,Spark 1.6.0并试图通过组合http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java中的旧文档和https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md中的新文档来运行.
这是我的pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>muhrafifm</groupId>
<artifactId>spark-cass-twitterdw</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.6.0-M1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.6.0-M1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
我所做的更改基本上是方法javaFunction,这是我根据新文档更改javaFunction后的方法之一.我还包括import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
// Prepare the schema
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
}
// Prepare the products hierarchy
List<Product> products = Arrays.asList(
new Product(0, "All products", Collections.<Integer>emptyList()),
new Product(1, "Product A", Arrays.asList(0)),
new Product(4, "Product A1", Arrays.asList(0, 1)),
new Product(5, "Product A2", Arrays.asList(0, 1)),
new Product(2, "Product B", Arrays.asList(0)),
new Product(6, "Product B1", Arrays.asList(0, 2)),
new Product(7, "Product B2", Arrays.asList(0, 2)),
new Product(3, "Product C", Arrays.asList(0)),
new Product(8, "Product C1", Arrays.asList(0, 3)),
new Product(9, "Product C2", Arrays.asList(0, 3))
);
JavaRDD<Product> productsRDD = sc.parallelize(products);
javaFunctions(productsRDD).writerBuilder("java_api", "products", mapToRow(Product.class)).saveToCassandra();
JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
@Override
public Boolean call(Product product) throws Exception {
return product.getParents().size() == 2;
}
}).flatMap(new FlatMapFunction<Product, Sale>() {
@Override
public Iterable<Sale> call(Product product) throws Exception {
Random random = new Random();
List<Sale> sales = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++) {
sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
}
return sales;
}
});
javaFunctions(salesRDD).writerBuilder("java_api", "sales", mapToRow(Sale.class)).saveToCassandra();
}
这是我得到的错误.
16/03/04 13:29:06 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/03/04 13:29:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
at muhrafifm.spark.cass.twitterdw.Demo.generateData(Demo.java:69)
at muhrafifm.spark.cass.twitterdw.Demo.run(Demo.java:35)
at muhrafifm.spark.cass.twitterdw.Demo.main(Demo.java:181)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
16/03/04 13:29:40 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/03/04 13:29:41 INFO SparkContext: Invoking stop() from shutdown hook
16/03/04 13:29:41 INFO SparkUI: Stopped Spark web UI at http://10.144.233.28:4040
16/03/04 13:29:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/04 13:29:42 INFO MemoryStore: MemoryStore cleared
16/03/04 13:29:42 INFO BlockManager: BlockManager stopped
16/03/04 13:29:42 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/04 13:29:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/04 13:29:42 INFO SparkContext: Successfully stopped SparkContext
16/03/04 13:29:42 INFO ShutdownHookManager: Shutdown hook called
16/03/04 13:29:42 INFO ShutdownHookManager: Deleting directory /tmp/spark- 16fd2ae2-b61b-4411-a776-1e578caabba6
------------------------------------------------------------------------
BUILD FAILURE
有什么我做错了吗?它似乎需要我甚至不使用的包,有什么可以解决的吗?或者我应该使用以前版本的cassandra-spark-connector?
感谢您的回复,谢谢.
解决方法:
代码正在寻找
org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
所以你应该包含spark-sql库,它具有正确的依赖性.
本文标题为:从Java中的Spark运行Cassandra时出错 – org.apache.spark.sql.catalyst中的NoClassDefFoundError


基础教程推荐
- 图解Springboot集成七牛云并实现图片上传功能过程 2023-08-10
- 用java代码帮朋友P图 2023-03-16
- Java与JavaScript前后端实现手机号验证码一键注册登陆抖音流程 2023-01-29
- 图解Java经典算法快速排序的原理与实现 2023-05-14
- java数据结构算法稀疏数组示例详解 2022-11-29
- Java实现简易学生管理系统 2023-03-16
- SpringBoot项目jar和war打包部署方式详解 2023-05-08
- zookeeper集群搭建超详细过程 2023-01-29
- redis实现队列的阻塞、延时、发布和订阅 2022-12-16
- 全面解析@InsertProvider执行原理 2023-02-19