That is actually a bug in the UI that got fixed in 1.5.1. The batch is
actually completing with exception, the UI does not update correctly.

On Tue, Oct 20, 2015 at 8:38 AM, varun sharma <varunsharman...@gmail.com>
wrote:

> Also, As you can see the timestamps in attached image. batches coming
> after the Cassandra server comes up(21:04) are processed and batches which
> are in hung state(21:03) never get processed.
> So, How do I fail those batches so that those can be processed again.
>
> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Hi TD,
>> Yes saveToCassandra throws exception. How do I fail that task explicitly
>> if i catch any exceptions?.
>> Right now that batch doesn't fail and remain in hung state. Is there any
>> way I fail that batch so that it can be tried again.
>>
>> Thanks
>> Varun
>>
>> On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> If cassandra is down, does saveToCassandra throw an exception? If it
>>> does, you can catch that exception and write your own logic to retry and/or
>>> no update. Once the foreachRDD function completes, that batch will be
>>> internally marked as completed.
>>>
>>> TD
>>>
>>> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharman...@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>>>> job.*
>>>> *Spark 1.4.0*
>>>> *cassandra connector 1.4.0-M3*
>>>> *Issue is:*
>>>>
>>>> I am reading data from *Kafka* using DirectStream, writing to
>>>> *Cassandra* after parsing the json and the subsequently updating the
>>>> offsets in *zookeeper*.
>>>> If Cassandra cluster is down, it throws exception but the batch which
>>>> arrives in that time window is not processed ever though the offsets are
>>>> updated in zookeeper.
>>>> It is resulting data loss.
>>>> Once the Cassandra cluster is up, this job process the data normally.
>>>> PFA the screenshots of hung batches and code.
>>>>
>>>> *Code:*
>>>>
>>>> data_rdd.foreachRDD(rdd=> {
>>>>   val stream = rdd
>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>>>> StreamModel.getColumns)
>>>>
>>>>
>>>>   //commit the offsets once everything is done
>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>> })
>>>>
>>>> *I have even tried this variant:*
>>>>
>>>> data_rdd.foreachRDD(rdd=> {
>>>>   val stream = rdd
>>>>     .map(x =>JsonUtility.deserialize(x))
>>>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>>>> StreamModel.getColumns)
>>>> })
>>>>
>>>> data_rdd.foreachRDD(rdd=> {
>>>>
>>>>   //commit the offsets once everything is done
>>>>
>>>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>>>
>>>> }
>>>>
>>>> Exception when cassandra cluster is down:
>>>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>>>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>>>> streaming job 1445239140000 ms.3
>>>> java.io.IOException: Failed to open native connection to Cassandra at
>>>> {......}
>>>>
>>>> --
>>>> *VARUN SHARMA*
>>>> *Flipkart*
>>>> *Bangalore*
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>
>>>
>>
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>

Reply via email to