Hi TD, Is there any way in spark I can fail/retry batch in case of any exceptions or do I have to write code to explicitly keep on retrying? Also If some batch fail, I want to block further batches to be processed as it would create inconsistency in updation of zookeeper offsets and maybe kill the job itself after lets say 3 retries.
Any pointers to achieve same are appreciated. On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das <t...@databricks.com> wrote: > That is actually a bug in the UI that got fixed in 1.5.1. The batch is > actually completing with exception, the UI does not update correctly. > > On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <varunsharman...@gmail.com> > wrote: > >> Also, As you can see the timestamps in attached image. batches coming >> after the Cassandra server comes up(21:04) are processed and batches which >> are in hung state(21:03) never get processed. >> So, How do I fail those batches so that those can be processed again. >> >> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharman...@gmail.com> >> wrote: >> >>> Hi TD, >>> Yes saveToCassandra throws exception. How do I fail that task explicitly >>> if i catch any exceptions?. >>> Right now that batch doesn't fail and remain in hung state. Is there any >>> way I fail that batch so that it can be tried again. >>> >>> Thanks >>> Varun >>> >>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> If cassandra is down, does saveToCassandra throw an exception? If it >>>> does, you can catch that exception and write your own logic to retry and/or >>>> no update. Once the foreachRDD function completes, that batch will be >>>> internally marked as completed. >>>> >>>> TD >>>> >>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma < >>>> varunsharman...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming >>>>> job.* >>>>> *Spark 1.4.0* >>>>> *cassandra connector 1.4.0-M3* >>>>> *Issue is:* >>>>> >>>>> I am reading data from *Kafka* using DirectStream, writing to >>>>> *Cassandra* after parsing the json and the subsequently updating the >>>>> offsets in *zookeeper*. >>>>> If Cassandra cluster is down, it throws exception but the batch which >>>>> arrives in that time window is not processed ever though the offsets are >>>>> updated in zookeeper. >>>>> It is resulting data loss. >>>>> Once the Cassandra cluster is up, this job process the data normally. >>>>> PFA the screenshots of hung batches and code. >>>>> >>>>> *Code:* >>>>> >>>>> data_rdd.foreachRDD(rdd=> { >>>>> val stream = rdd >>>>> .map(x =>JsonUtility.deserialize(x)) >>>>> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >>>>> StreamModel.getColumns) >>>>> >>>>> >>>>> //commit the offsets once everything is done >>>>> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >>>>> }) >>>>> >>>>> *I have even tried this variant:* >>>>> >>>>> data_rdd.foreachRDD(rdd=> { >>>>> val stream = rdd >>>>> .map(x =>JsonUtility.deserialize(x)) >>>>> stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, >>>>> StreamModel.getColumns) >>>>> }) >>>>> >>>>> data_rdd.foreachRDD(rdd=> { >>>>> >>>>> //commit the offsets once everything is done >>>>> >>>>> ZookeeperManager.updateOffsetsinZk(zkProperties, rdd) >>>>> >>>>> } >>>>> >>>>> Exception when cassandra cluster is down: >>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR] >>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job >>>>> streaming job 1445239140000 ms.3 >>>>> java.io.IOException: Failed to open native connection to Cassandra at >>>>> {......} >>>>> >>>>> -- >>>>> *VARUN SHARMA* >>>>> *Flipkart* >>>>> *Bangalore* >>>>> >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>> >>>> >>> >>> >>> -- >>> *VARUN SHARMA* >>> *Flipkart* >>> *Bangalore* >>> >> >> >> >> -- >> *VARUN SHARMA* >> *Flipkart* >> *Bangalore* >> > > -- *VARUN SHARMA* *Flipkart* *Bangalore*