The kafka direct stream meets those requirements.  You don't need
checkpointing for exactly-once.  Indeed, unless your output operations are
idempotent, you can't get exactly-once if you're relying on checkpointing.
Instead, you need to store the offsets atomically in the same transaction
as your results.

See
https://github.com/koeninger/kafka-exactly-once
and the video / blog posts linked from it.

The dibhatt consumer that Akhil linked is using zookeeper to store offsets,
so to the best of my knowledge, it cannot do exactly-once without
idempotent output operations.

Regarding the issues around code changes and checkpointing, the most
straightforward way to deal with this is to just start a new version of
your job before stopping the old one.  If you care about delivery semantics
and are using checkpointing, your output operation must be idempotent
anyway, so having 2 versions of the code running at the same time for a
brief period should not be a problem.



On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com
> wrote:

> >> checkpoints can't be used between controlled restarts
>
> Is that true? If so, why? From my testing, checkpoints appear to be
> working fine, we get the data we've missed between the time the consumer
> went down and the time we brought it back up.
>
> >> If I cannot make checkpoints between code upgrades, does it mean that
> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
> that I have to implement my own storing to/initalization of offsets from
> Zookeeper?
>
> By code upgrades, are code changes to the consumer program meant?
>
> If that is the case, one idea we've been entertaining is that, if the
> consumer changes, especially if its configuration parameters change, it
> means that some older configuration may still be stuck in the
> checkpointing.  What we'd do in this case is, prior to starting the
> consumer, blow away the checkpointing directory and re-consume from Kafka
> from the smallest offsets.  In our case, it's OK to re-process; I realize
> that in many cases that may not be an option.  If that's the case then it
> would seem to follow that you have to manage offsets in Zk...
>
> Another thing to consider would be to treat upgrades operationally. In
> that, if an upgrade is to happen, consume the data up to a certain point
> then bring the system down for an upgrade. Remove checkpointing. Restart
> everything; the system would now be rebuilding the checkpointing and using
> your upgraded consumers.  (Again, this may not be possible in some systems
> where the data influx is constant and/or the data is mission critical)...
>
> Perhaps this discussion implies that there may be a new feature in Spark
> where it intelligently drops the checkpointing or allows you to selectively
> pluck out and drop some items prior to restarting...
>
>
>
>
> On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> This consumer pretty much covers all those scenarios you listed
>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki <k.zarzy...@gmail.com
>> > wrote:
>>
>>> Hi there,
>>> I have a problem with fulfilling all my needs when using Spark Streaming
>>> on Kafka. Let me enumerate my requirements:
>>> 1. I want to have at-least-once/exactly-once processing.
>>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>>> offsets need to be tracked between restarts.
>>> 3. I want to be able to upgrade code of my application without losing
>>> Kafka offsets.
>>>
>>> Now what my requirements imply according to my knowledge:
>>> 1. implies using new Kafka DirectStream.
>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>> to the checkpoint as well.
>>> 3. implies that checkpoints can't be used between controlled restarts.
>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here
>>> is a description how:
>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>> )
>>>
>>> Now my problems are:
>>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>>> that I have to implement my own storing to/initalization of offsets from
>>> Zookeeper?
>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>> it seems that application does not fail, but stuck in running state. Is
>>> that because stopGracefully deadlocks on exceptions? How to overcome this
>>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>>> stop gracefully your app?
>>>
>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>>> to be able to upgrade code & not lose Kafka offsets?
>>>
>>>
>>> Thank you a lot for your answers,
>>> Krzysztof Zarzycki
>>>
>>>
>>>
>>>
>>
>

Reply via email to