Hi TD,
Yes saveToCassandra throws exception. How do I fail that task explicitly if
i catch any exceptions?.
Right now that batch doesn't fail and remain in hung state. Is there any
way I fail that batch so that it can be tried again.

Thanks
Varun

On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das <t...@databricks.com> wrote:

> If cassandra is down, does saveToCassandra throw an exception? If it does,
> you can catch that exception and write your own logic to retry and/or no
> update. Once the foreachRDD function completes, that batch will be
> internally marked as completed.
>
> TD
>
> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <varunsharman...@gmail.com>
> wrote:
>
>> Hi,
>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>> job.*
>> *Spark 1.4.0*
>> *cassandra connector 1.4.0-M3*
>> *Issue is:*
>>
>> I am reading data from *Kafka* using DirectStream, writing to *Cassandra* 
>> after
>> parsing the json and the subsequently updating the offsets in *zookeeper*
>> .
>> If Cassandra cluster is down, it throws exception but the batch which
>> arrives in that time window is not processed ever though the offsets are
>> updated in zookeeper.
>> It is resulting data loss.
>> Once the Cassandra cluster is up, this job process the data normally.
>> PFA the screenshots of hung batches and code.
>>
>> *Code:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>>     .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>>
>>
>>   //commit the offsets once everything is done
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>> })
>>
>> *I have even tried this variant:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>>     .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>> })
>>
>> data_rdd.foreachRDD(rdd=> {
>>
>>   //commit the offsets once everything is done
>>
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>
>> }
>>
>> Exception when cassandra cluster is down:
>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>> streaming job 1445239140000 ms.3
>> java.io.IOException: Failed to open native connection to Cassandra at
>> {......}
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*

Reply via email to