Unfortunately, you will have to write that code yourself. TD
On Tue, Oct 20, 2015 at 11:28 PM, varun sharma <varunsharman...@gmail.com> wrote: > Hi TD, > Is there any way in spark I can fail/retry batch in case of any > exceptions or do I have to write code to explicitly keep on retrying? > Also If some batch fail, I want to block further batches to be processed > as it would create inconsistency in updation of zookeeper offsets and maybe > kill the job itself after lets say 3 retries. > > Any pointers to achieve same are appreciated. > > On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das <t...@databricks.com> > wrote: > >> That is actually a bug in the UI that got fixed in 1.5.1. The batch is >> actually completing with exception, the UI does not update correctly. >> >> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <varunsharman...@gmail.com> >> wrote: >> >>> Also, As you can see the timestamps in attached image. batches coming >>> after the Cassandra server comes up(21:04) are processed and batches which >>> are in hung state(21:03) never get processed. >>> So, How do I fail those batches so that those can be processed again. >>> >>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharman...@gmail.com >>> > wrote: >>> >>>> Hi TD, >>>> Yes saveToCassandra throws exception. How do I fail that task >>>> explicitly if i catch any exceptions?. >>>> Right now that batch doesn't fail and remain in hung state. Is there >>>> any way I fail that batch so that it can be tried again. >>>> >>>> Thanks >>>> Varun >>>> >>>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> If cassandra is down, does saveToCassandra throw an exception? If it >>>>> does, you can catch that exception and write your own logic to retry >>>>> and/or >>>>> no update. Once the foreachRDD function completes, that batch will be >>>>> internally marked as completed. >>>>> >>>>> TD >>>>> >>>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma < >>>>> varunsharman...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming >>>>>> job.* >>>>>> *Spark 1.4.0* >>>>>> *cassandra connector 1.4.0-M3* >>>>>> *Issue is:* >>>>>> >>>>>> I am reading data from *Kafka* using DirectStream, writing to >>>>>> *Cassandra* after parsing the json and the subsequently updating the >>>>>> offsets in *zookeeper*. >>>>>> If Cassandra cluster is down, it throws exception but the batch which >>>>>> arrives in that time window is not processed ever though the offsets are >>>>>> updated in zookeeper. >>>>>> It is resulting data loss. >>>>>> Once the Cassandra cluster is up, this job process the data normally. >>>>>> PFA the screenshots of hung batches and code. >>>>>> >>>>>> *Code:* >>>>>> >>>>>> data_rdd.foreachRDD(rdd=> { >>>>>> val stream = rdd >>>>>> .map(x =>JsonUtility.deserialize(x)) >>>>>> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >>>>>> StreamModel.getColumns) >>>>>> >>>>>> >>>>>> //commit the offsets once everything is done >>>>>> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >>>>>> }) >>>>>> >>>>>> *I have even tried this variant:* >>>>>> >>>>>> data_rdd.foreachRDD(rdd=> { >>>>>> val stream = rdd >>>>>> .map(x =>JsonUtility.deserialize(x)) >>>>>> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >>>>>> StreamModel.getColumns) >>>>>> }) >>>>>> >>>>>> data_rdd.foreachRDD(rdd=> { >>>>>> >>>>>> //commit the offsets once everything is done >>>>>> >>>>>> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >>>>>> >>>>>> } >>>>>> >>>>>> Exception when cassandra cluster is down: >>>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR] >>>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job >>>>>> streaming job 1445239140000 ms.3 >>>>>> java.io.IOException: Failed to open native connection to Cassandra at >>>>>> {......} >>>>>> >>>>>> -- >>>>>> *VARUN SHARMA* >>>>>> *Flipkart* >>>>>> *Bangalore* >>>>>> >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> *VARUN SHARMA* >>>> *Flipkart* >>>> *Bangalore* >>>> >>> >>> >>> >>> -- >>> *VARUN SHARMA* >>> *Flipkart* >>> *Bangalore* >>> >> >> > > > -- > *VARUN SHARMA* > *Flipkart* > *Bangalore* >