Hi all,
I have in a single server installed spark 1.3.1 and cassandra 2.0.14
I'm coding a simple java class for Spark Streaming as follow:

   - reading header events from flume sink
   - based on header I write the event body on navigation or transaction
   table (cassandra)

unfortunatly I get NoHostAvailableException, if I comment the code for
saving one of the two tables everything works


*here the code*

     public static void main(String[] args) {

        // Create a local StreamingContext with two working thread and
batch interval of 1 second
         SparkConf conf = new
SparkConf().setMaster("local[2]").setAppName("DWXNavigationApp");

         conf.set("spark.cassandra.connection.host", "127.0.0.1");
         conf.set("spark.cassandra.connection.native.port","9042");
         conf.set("spark.cassandra.output.batch.size.rows", "1");
         conf.set("spark.cassandra.output.concurrent.writes", "1");


         final JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(1));

         JavaReceiverInputDStream<SparkFlumeEvent> flumeStreamNavig =
FlumeUtils.createPollingStream(jssc, "127.0.0.1", 8888);


         JavaDStream<String> logRowsNavig = flumeStreamNavig.map(
                 new Function<SparkFlumeEvent,String>(){

                    @Override
                    public String call(SparkFlumeEvent arg0) throws
Exception {
                        // TODO Auto-generated method stub0.

                        Map<CharSequence,CharSequence> headers =
arg0.event().getHeaders();

                        ByteBuffer bytePayload = arg0.event().getBody();
                        String s = headers.get("source_log").toString() +
"#" + new String(bytePayload.array());
                        System.out.println("RIGA: " + s);
                        return s;
                    }
                 });


         logRowsNavig.foreachRDD(
                 new Function<JavaRDD<String>,Void>(){
                    @Override
                    public Void call(JavaRDD<String> rows) throws Exception
{

                        if(!rows.isEmpty()){

                             //String header =
getHeaderFronRow(rows.collect());

                             List<Navigation> listNavigation = new
ArrayList<Navigation>();
                             List<Transaction> listTransaction = new
ArrayList<Transaction>();

                             for(String row : rows.collect()){

                                 String header = row.substring(0,
row.indexOf("#"));

                                 if(header.contains("controller_log")){

 listNavigation.add(createNavigation(row));
                                     System.out.println("Added Element in
Navigation List");

                                 }else if(header.contains("business_log")){

 listTransaction.add(createTransaction(row));
                                     System.out.println("Added Element in
Transaction List");
                                 }

                             }


                             if(!listNavigation.isEmpty()){
                                 JavaRDD<Navigation> navigationRows=
jssc.sparkContext().parallelize(listNavigation);


 javaFunctions(navigationRows).writerBuilder("cassandrasink", "navigation",
mapToRow(Navigation.class)).saveToCassandra();
                             }


                             if(!listTransaction.isEmpty()){
                                 JavaRDD<Transaction> transactionRows=
jssc.sparkContext().parallelize(listTransaction);


 javaFunctions(transactionRows).writerBuilder("cassandrasink",
"transaction", mapToRow(Transaction.class)).saveToCassandra();

                             }

                        }
                        return null;

                    }
             });

        jssc.start();              // Start the computation
        jssc.awaitTermination();   // Wait for the computation to terminate
     }


*here the exception*


15/05/29 11:19:29 ERROR QueryExecutor: Failed to execute:

com.datastax.spark.connector.writer.RichBatchStatement@ab76b83

com.datastax.driver.core.exceptions.NoHostAvailableException: All

host(s) tried for query failed (no host was tried)

         at

com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)

         at

com.datastax.driver.core.SessionManager.execute(SessionManager.java:538)

         at

com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:577)

         at

com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:119)

         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

         at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

         at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

         at java.lang.reflect.Method.invoke(Method.java:601)

         at

com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)

         at $Proxy17.executeAsync(Unknown Source)

         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

         at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

         at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

         at java.lang.reflect.Method.invoke(Method.java:601)

         at

com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)

         at $Proxy17.executeAsync(Unknown Source)

         at

com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)

         at

com.datastax.spark.connector.writer.QueryExecutor$$anonfun$$init$$1.apply(QueryExecutor.scala:11)

         at

com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:137)

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:136)

         at scala.collection.Iterator$class.foreach(Iterator.scala:727)

         at

com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:136)

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)

         at

com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)

         at

com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)

         at

com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)

         at

com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)

         at

com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)

         at

com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)

         at

com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)

         at

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

         at org.apache.spark.scheduler.Task.run(Task.scala:64)

         at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

         at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

         at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

         at java.lang.Thread.run(Thread.java:722)

15/05/29 11:19:29 ERROR Executor: Exception in task 1.0 in stage 15.0 (TID
20)

java.io.IOException: Failed to write statements to cassandrasink.navigation.

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)

         at

com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)

         at

com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)

         at

com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)

         at

com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)

         at

com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)

         at

com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)

         at

com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)

         at

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

         at org.apache.spark.scheduler.Task.run(Task.scala:64)

         at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

         at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

         at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

         at java.lang.Thread.run(Thread.java:722)

15/05/29 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 15.0 (TID 20,
localhost): java.io.IOException: Failed to write statements to
cassandrasink.navigation.

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:145)

         at

com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:120)

         at

com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:100)

         at

com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:99)

         at

com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:151)

         at

com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)

         at

com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:120)

         at

com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)

         at

com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)

         at

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

         at org.apache.spark.scheduler.Task.run(Task.scala:64)

         at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

         at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

         at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

         at java.lang.Thread.run(Thread.java:722)



A G

Reply via email to