Unfortunately, you will have to write that code yourself.

TD

On Tue, Oct 20, 2015 at 11:28 PM, varun sharma <varunsharman...@gmail.com>
wrote:

> Hi TD,
> Is there any way in spark  I can fail/retry batch in case of any
> exceptions or do I have to write code to explicitly keep on retrying?
> Also If some batch fail, I want to block further batches to be processed
> as it would create inconsistency in updation of zookeeper offsets and maybe
> kill the job itself after lets say 3 retries.
>
> Any pointers to achieve same are appreciated.
>
> On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
>> actually completing with exception, the UI does not update correctly.
>>
>> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <varunsharman...@gmail.com>
>> wrote:
>>
>>> Also, As you can see the timestamps in attached image. batches coming
>>> after the Cassandra server comes up(21:04) are processed and batches which
>>> are in hung state(21:03) never get processed.
>>> So, How do I fail those batches so that those can be processed again.
>>>
>>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharman...@gmail.com
>>> > wrote:
>>>
>>>> Hi TD,
>>>> Yes saveToCassandra throws exception. How do I fail that task
>>>> explicitly if i catch any exceptions?.
>>>> Right now that batch doesn't fail and remain in hung state. Is there
>>>> any way I fail that batch so that it can be tried again.
>>>>
>>>> Thanks
>>>> Varun
>>>>
>>>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>>>> does, you can catch that exception and write your own logic to retry 
>>>>> and/or
>>>>> no update. Once the foreachRDD function completes, that batch will be
>>>>> internally marked as completed.
>>>>>
>>>>> TD
>>>>>
>>>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
>>>>> varunsharman...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>>>>> job.*
>>>>>> *Spark 1.4.0*
>>>>>> *cassandra connector 1.4.0-M3*
>>>>>> *Issue is:*
>>>>>>
>>>>>> I am reading data from *Kafka* using DirectStream, writing to
>>>>>> *Cassandra* after parsing the json and the subsequently updating the
>>>>>> offsets in *zookeeper*.
>>>>>> If Cassandra cluster is down, it throws exception but the batch which
>>>>>> arrives in that time window is not processed ever though the offsets are
>>>>>> updated in zookeeper.
>>>>>> It is resulting data loss.
>>>>>> Once the Cassandra cluster is up, this job process the data normally.
>>>>>> PFA the screenshots of hung batches and code.
>>>>>>
>>>>>> *Code:*
>>>>>>
>>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>>   val stream = rdd
>>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>>>>>> StreamModel.getColumns)
>>>>>>
>>>>>>
>>>>>>   //commit the offsets once everything is done
>>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>>> })
>>>>>>
>>>>>> *I have even tried this variant:*
>>>>>>
>>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>>   val stream = rdd
>>>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>>>>>> StreamModel.getColumns)
>>>>>> })
>>>>>>
>>>>>> data_rdd.foreachRDD(rdd=> {
>>>>>>
>>>>>>   //commit the offsets once everything is done
>>>>>>
>>>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>>>
>>>>>> }
>>>>>>
>>>>>> Exception when cassandra cluster is down:
>>>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>>>>> streaming job 1445239140000 ms.3
>>>>>> java.io.IOException: Failed to open native connection to Cassandra at
>>>>>> {......}
>>>>>>
>>>>>> --
>>>>>> *VARUN SHARMA*
>>>>>> *Flipkart*
>>>>>> *Bangalore*
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *VARUN SHARMA*
>>>> *Flipkart*
>>>> *Bangalore*
>>>>
>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>

Reply via email to