If cassandra is down, does saveToCassandra throw an exception? If it does,
you can catch that exception and write your own logic to retry and/or no
update. Once the foreachRDD function completes, that batch will be
internally marked as completed.

TD

On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharman...@gmail.com>
wrote:

> Hi,
> I am facing this issue consistently in spark-cassandra-kafka *streaming
> job.*
> *Spark 1.4.0*
> *cassandra connector 1.4.0-M3*
> *Issue is:*
>
> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* 
> after
> parsing the json and the subsequently updating the offsets in *zookeeper*.
> If Cassandra cluster is down, it throws exception but the batch which
> arrives in that time window is not processed ever though the offsets are
> updated in zookeeper.
> It is resulting data loss.
> Once the Cassandra cluster is up, this job process the data normally.
> PFA the screenshots of hung batches and code.
>
> *Code:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
>     .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
>
>
>   //commit the offsets once everything is done
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
> })
>
> *I have even tried this variant:*
>
> data_rdd.foreachRDD(rdd=> {
>   val stream = rdd
>     .map(x =>JsonUtility.deserialize(x))
>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
> StreamModel.getColumns)
> })
>
> data_rdd.foreachRDD(rdd=> {
>
>   //commit the offsets once everything is done
>
>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>
> }
>
> Exception when cassandra cluster is down:
> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
> streaming job 1445239140000 ms.3
> java.io.IOException: Failed to open native connection to Cassandra at
> {......}
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to