Hi Alex,

I guess wiping out the state directory is easier code-wise, faster, and/or at the time of development the developers did not design for remote state stores. But I do actually not know the exact reason.

Off the top of my head, I do not know how to solve this for remote state stores. Using the uncaught exception handler is not good, because a computing node could fail without giving the JVM the opportunity to throw an exception.

In your tests, try to increase the commit interval to a high value and see if you get inconsistencies. You should get an inconsistency if the state store maintains counts for keys and after the last commit before the failure, the Streams app puts an event with a new key K with value 1 into the state store. After failover, Streams would put the same event with key K again into the state store. If the state store deleted all of its data, Streams would put again value 1, but if the state store did not delete all data, Streams would put value 2 which is wrong because it would count the same event twice.


On 15.03.21 15:20, Alex Craig wrote:
Thanks for the info!  that makes sense.  Of course now I have more
questions.  :)  Do you know why this is being done outside of the state
store API?  I assume there are reasons why a "deleteAll()" type of function
wouldn't work, thereby allowing a state store to purge itself?  And maybe
more importantly, is there a way to achieve a similar behavior with a 3rd
party store?  I'm not sure if hooking into the uncaught exception handler
might be a good way to purge/drop a state store in the event of a fatal
error?  I did setup a MongoDB state store recently as part of a POC and was
testing it with EOS enabled.  (forcing crashes to occur and checking that
the result of my aggregation was still accurate)  I was unable to cause
inconsistent data in the mongo store (which is good!), though of course I
may just have been getting lucky.  Thanks again for your help,


On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <pdeole2...@gmail.com> wrote:


i tried to explain this in 'kafka user's language through above mentioned
scenario, hope i put it properly -:) and correct me if i am wrong

On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <pdeole2...@gmail.com>

This is what I understand could be the issue with external state store:

kafka stream application consumes source topic, does processing, stores
state to kafka state store (this is backed by topic) and before producing
event on destination topic, the application fails with some issue. In
this case, the transaction has failed, so kafka guarantees either all or
none, means offset written to source topic, state written to state store
topic, output produced on destination topic... all of these happen or
of these and in this failure scenario it is none of these.

Assume you have redis state store, and you updated the state into redis
and stream application failed. Now, you have source topic and destination
topic consistent i.e. offset is not committed to source topic and output
not produced on destination topic, but you redis state store is
inconsistent with that since it is external state store and kafka can't
guarantee rollback ot state written there

On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <alexcrai...@gmail.com>

" Another issue with 3rd party state stores could be violation of
exactly-once guarantee provided by kafka streams in the event of a
of streams application instance"

I've heard this before but would love to know more about how a custom
store would be at any greater risk than RocksDB as far as exactly-once
guarantees are concerned.  They all implement the same interface, so as
long as you're correctly implementing get(), put(), delete(), flush(),
you should be fine right?  In other words, I don't think there is any
special "exactly once magic" that is baked into the RocksDB store
code.  I
could be wrong though so I'd love to hear people's thoughts, thanks,

Alex C

On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <mpart...@hpe.com>

Thanks for the responses. In the worst case, I might have to keep both
rocksdb for local store and keep an external store like Redis.


On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com> wrote:

     Another issue with 3rd party state stores could be violation of
     exactly-once guarantee provided by kafka streams in the event of a
     of streams application instance.
     Kafka provides exactly once guarantee for consumer -> process ->
     through transactions and with the use of state store like redis,
     guarantee is weaker

     On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <wangg...@gmail.com


     > Hello Mohan,
     > I think what you had in mind works with Redis, since it is a
     > store engine, it does not have the co-partitioning requirements
     > state stores.
     > One thing you'd need to tune KS though is that with remote
     > processing latency may be larger, and since Kafka Streams
     > records of a single partition in order, synchronously, you may
to tune
     > the poll interval configs etc to make sure KS would stay in the
     > group and not trigger unnecessary rebalances.
     > Guozhang
     > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
     > wrote:
     > > Hi,
     > >
     > > I have a use case where messages come in with some key gets
assigned some
     > > partition and the state gets created. Later, key changes (but
     > > contains the old key in the message) and gets sent to a
     > > partition. I want to be able to grab the old state using the
     > before
     > > creating the new state on this instance. Redis as a  state
makes it
     > > easy to implement this where I can simply do a lookup before
creating the
     > > state. I see an implementation here :
     > >

     > >
     > > Has anyone tried this ? Any caveats.
     > >
     > > Thanks
     > > Mohan
     > >
     > >
     > --
     > -- Guozhang

Reply via email to