That is actually a bug in the UI that got fixed in 1.5.1. The batch is actually completing with exception, the UI does not update correctly.
On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <varunsharman...@gmail.com> wrote: > Also, As you can see the timestamps in attached image. batches coming > after the Cassandra server comes up(21:04) are processed and batches which > are in hung state(21:03) never get processed. > So, How do I fail those batches so that those can be processed again. > > On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharman...@gmail.com> > wrote: > >> Hi TD, >> Yes saveToCassandra throws exception. How do I fail that task explicitly >> if i catch any exceptions?. >> Right now that batch doesn't fail and remain in hung state. Is there any >> way I fail that batch so that it can be tried again. >> >> Thanks >> Varun >> >> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> If cassandra is down, does saveToCassandra throw an exception? If it >>> does, you can catch that exception and write your own logic to retry and/or >>> no update. Once the foreachRDD function completes, that batch will be >>> internally marked as completed. >>> >>> TD >>> >>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharman...@gmail.com >>> > wrote: >>> >>>> Hi, >>>> I am facing this issue consistently in spark-cassandra-kafka *streaming >>>> job.* >>>> *Spark 1.4.0* >>>> *cassandra connector 1.4.0-M3* >>>> *Issue is:* >>>> >>>> I am reading data from *Kafka* using DirectStream, writing to >>>> *Cassandra* after parsing the json and the subsequently updating the >>>> offsets in *zookeeper*. >>>> If Cassandra cluster is down, it throws exception but the batch which >>>> arrives in that time window is not processed ever though the offsets are >>>> updated in zookeeper. >>>> It is resulting data loss. >>>> Once the Cassandra cluster is up, this job process the data normally. >>>> PFA the screenshots of hung batches and code. >>>> >>>> *Code:* >>>> >>>> data_rdd.foreachRDD(rdd=> { >>>> val stream = rdd >>>> .map(x =>JsonUtility.deserialize(x)) >>>> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >>>> StreamModel.getColumns) >>>> >>>> >>>> //commit the offsets once everything is done >>>> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >>>> }) >>>> >>>> *I have even tried this variant:* >>>> >>>> data_rdd.foreachRDD(rdd=> { >>>> val stream = rdd >>>> .map(x =>JsonUtility.deserialize(x)) >>>> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >>>> StreamModel.getColumns) >>>> }) >>>> >>>> data_rdd.foreachRDD(rdd=> { >>>> >>>> //commit the offsets once everything is done >>>> >>>> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >>>> >>>> } >>>> >>>> Exception when cassandra cluster is down: >>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR] >>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job >>>> streaming job 1445239140000 ms.3 >>>> java.io.IOException: Failed to open native connection to Cassandra at >>>> {......} >>>> >>>> -- >>>> *VARUN SHARMA* >>>> *Flipkart* >>>> *Bangalore* >>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >>> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >