Hi TD, Yes saveToCassandra throws exception. How do I fail that task explicitly if i catch any exceptions?. Right now that batch doesn't fail and remain in hung state. Is there any way I fail that batch so that it can be tried again.
Thanks Varun On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com> wrote: > If cassandra is down, does saveToCassandra throw an exception? If it does, > you can catch that exception and write your own logic to retry and/or no > update. Once the foreachRDD function completes, that batch will be > internally marked as completed. > > TD > > On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharman...@gmail.com> > wrote: > >> Hi, >> I am facing this issue consistently in spark-cassandra-kafka *streaming >> job.* >> *Spark 1.4.0* >> *cassandra connector 1.4.0-M3* >> *Issue is:* >> >> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* >> after >> parsing the json and the subsequently updating the offsets in *zookeeper* >> . >> If Cassandra cluster is down, it throws exception but the batch which >> arrives in that time window is not processed ever though the offsets are >> updated in zookeeper. >> It is resulting data loss. >> Once the Cassandra cluster is up, this job process the data normally. >> PFA the screenshots of hung batches and code. >> >> *Code:* >> >> data_rdd.foreachRDD(rdd=> { >> val stream = rdd >> .map(x =>JsonUtility.deserialize(x)) >> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >> StreamModel.getColumns) >> >> >> //commit the offsets once everything is done >> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >> }) >> >> *I have even tried this variant:* >> >> data_rdd.foreachRDD(rdd=> { >> val stream = rdd >> .map(x =>JsonUtility.deserialize(x)) >> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >> StreamModel.getColumns) >> }) >> >> data_rdd.foreachRDD(rdd=> { >> >> //commit the offsets once everything is done >> >> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >> >> } >> >> Exception when cassandra cluster is down: >> [2015-10-19 12:49:20] [JobScheduler] [ERROR] >> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job >> streaming job 1445239140000 ms.3 >> java.io.IOException: Failed to open native connection to Cassandra at >> {......} >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*