If cassandra is down, does saveToCassandra throw an exception? If it does, you can catch that exception and write your own logic to retry and/or no update. Once the foreachRDD function completes, that batch will be internally marked as completed.
TD On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharman...@gmail.com> wrote: > Hi, > I am facing this issue consistently in spark-cassandra-kafka *streaming > job.* > *Spark 1.4.0* > *cassandra connector 1.4.0-M3* > *Issue is:* > > I am reading data from *Kafka* using DirectStream, writing to *Cassandra* > after > parsing the json and the subsequently updating the offsets in *zookeeper*. > If Cassandra cluster is down, it throws exception but the batch which > arrives in that time window is not processed ever though the offsets are > updated in zookeeper. > It is resulting data loss. > Once the Cassandra cluster is up, this job process the data normally. > PFA the screenshots of hung batches and code. > > *Code:* > > data_rdd.foreachRDD(rdd=> { > val stream = rdd > .map(x =>JsonUtility.deserialize(x)) > stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, > StreamModel.getColumns) > > > //commit the offsets once everything is done > ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) > }) > > *I have even tried this variant:* > > data_rdd.foreachRDD(rdd=> { > val stream = rdd > .map(x =>JsonUtility.deserialize(x)) > stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, > StreamModel.getColumns) > }) > > data_rdd.foreachRDD(rdd=> { > > //commit the offsets once everything is done > > ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) > > } > > Exception when cassandra cluster is down: > [2015-10-19 12:49:20] [JobScheduler] [ERROR] > [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job > streaming job 1445239140000 ms.3 > java.io.IOException: Failed to open native connection to Cassandra at > {......} > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >