无法在Spark上运行Docker上的Cassandra

我有一个在Docker上运行的Zeppelin笔记本。 我有以下代码使用Cassandra:

import org.apache.spark.sql.cassandra._ val cqlContext = new CassandraSQLContext(sc) cqlContext.sql("select * from demo.table").collect.foreach(println) 

但是,我得到这个错误:

 import org.apache.spark.sql.cassandra._ cqlContext: org.apache.spark.sql.cassandra.CassandraSQLContext = org.apache.spark.sql.cassandra.CassandraSQLContext@395e28a8 com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Cannot build a cluster without contact points at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2199) at com.google.common.cache.LocalCache.get(LocalCache.java:3932) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3936) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4806) at org.apache.spark.sql.cassandra.CassandraCatalog.lookupRelation(CassandraCatalog.scala:28) at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(CassandraSQLContext.scala:219) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:137) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:137) at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.lookupRelation(CassandraSQLContext.scala:219) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51) at $iwC$$iwC$$iwC.<init>(<console>:53) at $iwC$$iwC.<init>(<console>:55) at $iwC.<init>(<console>:57) at <init>(<console>:59) at .<init>(<console>:63) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at com.nflabs.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:541) at com.nflabs.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:517) at com.nflabs.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:510) at com.nflabs.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:40) at com.nflabs.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:76) at com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:246) at com.nflabs.zeppelin.scheduler.Job.run(Job.java:152) at com.nflabs.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Cannot build a cluster without contact points at com.datastax.driver.core.Cluster.checkNotEmpty(Cluster.java:116) at com.datastax.driver.core.Cluster.<init>(Cluster.java:108) at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:177) at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1109) at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:78) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:167) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:173) at org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:22) at org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:19) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3522) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2315) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2278) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2193) ... 92 more 

从Docker命令行运行docker pull cassandra但问题仍然存在。

我应该怎么做才能使用Cassandra?

为了让spark连接到cassandra集群,你必须在sparkconfiguration文件中提供cassandra集群的一个节点,如下所示:

 conf.set("spark.cassandra.connection.host", "127.0.0.1") 

我遇到了同样的问题Cannot build a cluster without contact points并通过设置SparkConf()来设法解决它,如下所示:

 conf = SparkConf() \ .setAppName("MyApp") \ .setMaster("spark://127.0.0.1:7077") \ .set("spark.cassandra.connection.host", "127.0.0.1") 

因此,与本地Cassandra连接的基本Spark <2.0程序(在Python中)应如下所示:

 from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext conf = SparkConf() \ .setAppName("PySpark Cassandra Test") \ .setMaster("spark://127.0.0.1:7077") \ .set("spark.cassandra.connection.host", "127.0.0.1") sc = SparkContext('local', conf=conf) sql = SQLContext(sc) test = sql.read.format("org.apache.spark.sql.cassandra").\ load(keyspace="mykeyspace", table="mytable") test.collect()