Hi all, I have in a single server installed spark 1.3.1 and cassandra 2.0.14 I'm coding a simple java class for Spark Streaming as follow:
- reading header events from flume sink - based on header I write the event body on navigation or transaction table (cassandra) unfortunatly I get NoHostAvailableException, if I comment the code for saving one of the two tables everything works *here the code* public static void main(String[] args) { // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp"); conf.set("spark.cassandra.connection.host", "127.0.0.1"); conf.set("spark.cassandra.connection.native.port","9042"); conf.set("spark.cassandra.output.batch.size.rows", "1"); conf.set("spark.cassandra.output.concurrent.writes", "1"); final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig = FlumeUtils.createPollingStream(jssc, "127.0.0.1", 8888); JavaDStream<String> logRowsNavig = flumeStreamNavig.map( new Function<SparkFlumeEvent,String>(){ @Override public String call(SparkFlumeEvent arg0) throws Exception { // TODO Auto-generated method stub0. Map<CharSequence,CharSequence> headers = arg0.event().getHeaders(); ByteBuffer bytePayload = arg0.event().getBody(); String s = headers.get("source_log").toString() + "#" + new String(bytePayload.array()); System.out.println("RIGA: " + s); return s; } }); logRowsNavig.foreachRDD( new Function<JavaRDD<String>,Void>(){ @Override public Void call(JavaRDD<String> rows) throws Exception { if(!rows.isEmpty()){ //String header = getHeaderFronRow(rows.collect()); List<Navigation> listNavigation = new ArrayList<Navigation>(); List<Transaction> listTransaction = new ArrayList<Transaction>(); for(String row : rows.collect()){ String header = row.substring(0, row.indexOf("#")); if(header.contains("controller_log")){ listNavigation.add(createNavigation(row)); System.out.println("Added Element in Navigation List"); }else if(header.contains("business_log")){ listTransaction.add(createTransaction(row)); System.out.println("Added Element in Transaction List"); } } if(!listNavigation.isEmpty()){ JavaRDD<Navigation> navigationRows= jssc.sparkContext().parallelize(listNavigation); javaFunctions(navigationRows).writerBuilder("cassandrasink", "navigation", mapToRow(Navigation.class)).saveToCassandra(); } if(!listTransaction.isEmpty()){ JavaRDD<Transaction> transactionRows= jssc.sparkContext().parallelize(listTransaction); javaFunctions(transactionRows).writerBuilder("cassandrasink", "transaction", mapToRow(Transaction.class)).saveToCassandra(); } } return null; } }); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *here the exception* 15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@ab76b83 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried) at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107) at com.datastax.driver.core.SessionManager.execute(SessionManager.java:538) at com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577) at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at $Proxy17.executeAsync(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33) at $Proxy17.executeAsync(Unknown Source) at com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11) at com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11) at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 (TID 20) java.io.IOException: Failed to write statements to cassandrasink.navigation. at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID 20, localhost): java.io.IOException: Failed to write statements to cassandrasink.navigation. at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) A G