Hey Pushkar, yes, the data will still be backed by a changelog topic unless
the
user explicitly disables logging for that state store. The fault tolerance
mechanism
of Kafka Streams is based on changelogging, therefore there are no
correctness
guarantees if you decide to disable it.

That said, I'm guessing many users do in fact disable the changelog when
plugging
in a remote store with it's own fault tolerance guarantees -- is that what
you're getting
at? We could definitely build in better support for that case, as either an
additional
optimization on top of KAFKA-12475
<https://issues.apache.org/jira/browse/KAFKA-12475> or as an alternative
implementation to fix the
underlying EOS problem. Check out my latest comment on the ticket here
<https://issues.apache.org/jira/browse/KAFKA-12475?focusedCommentId=17305191&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305191>

Does that address your question?

On Fri, Mar 19, 2021 at 10:14 AM Pushkar Deole <pdeole2...@gmail.com> wrote:

> Hello Sophie,
>
> may be i am missing something here, however can you let me know how a redis
> based state store will be wiped off the inconsistent state in case stream
> application dies in the middle of processing e.g. stream application
> consumed from source topic, processed source event and saved state to redis
> and before producing event on destination topic, the stream application had
> error.
> If this occurs with a rocksDB or in-memory state store, it will be rebuilt
> from changelog topic, however for redis state store, how it will wiped off
> the state ? are we saying here that the data stored in redis will still be
> backed by changelog topic and redis will be restored from backed topic in
> case of stream application error?
>
> On Tue, Mar 16, 2021 at 12:18 AM Sophie Blee-Goldman
> <sop...@confluent.io.invalid> wrote:
>
> > This certainly does seem like a flaw in the Streams API, although of
> course
> > Streams is just
> > in general not designed for use with remote anything (API calls, stores,
> > etc)
> >
> > That said, I don't see any reason why we *couldn't* have better support
> for
> > remote state stores.
> > Note that there's currently no deleteAll method on the store interface,
> and
> > not all databases
> > necessarily support that. But we could add a default implementation which
> > just calls delete(key)
> > on all keys in the state store, and for the RocksDB-based state stores we
> > still wipe out the state
> > as usual (and recommend the same for any custom StateStore which is local
> > rather than remote).
> > Obviously falling back on individual delete(key) operations for all the
> > data in the entire store will
> > have worse performance, but that's better than silently breaking EOS when
> > deleteAll is not available
> > on a remote store.
> >
> > Would you be interested in doing a KIP for this? We should also file a
> JIRA
> > with the above explanation,
> > so that other users of remote storage are aware of this limitation. And
> > definitely document this somewhere
> >
> >
> > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna <br...@confluent.io.invalid
> >
> > wrote:
> >
> > > Hi Alex,
> > >
> > > I guess wiping out the state directory is easier code-wise, faster,
> > > and/or at the time of development the developers did not design for
> > > remote state stores. But I do actually not know the exact reason.
> > >
> > > Off the top of my head, I do not know how to solve this for remote
> state
> > > stores. Using the uncaught exception handler is not good, because a
> > > computing node could fail without giving the JVM the opportunity to
> > > throw an exception.
> > >
> > > In your tests, try to increase the commit interval to a high value and
> > > see if you get inconsistencies. You should get an inconsistency if the
> > > state store maintains counts for keys and after the last commit before
> > > the failure, the Streams app puts an event with a new key K with value
> 1
> > > into the state store. After failover, Streams would put the same event
> > > with key K again into the state store. If the state store deleted all
> of
> > > its data, Streams would put again value 1, but if the state store did
> > > not delete all data, Streams would put value 2 which is wrong because
> it
> > > would count the same event twice.
> > >
> > > Best,
> > > Bruno
> > >
> > >
> > > On 15.03.21 15:20, Alex Craig wrote:
> > > > Bruno,
> > > > Thanks for the info!  that makes sense.  Of course now I have more
> > > > questions.  :)  Do you know why this is being done outside of the
> state
> > > > store API?  I assume there are reasons why a "deleteAll()" type of
> > > function
> > > > wouldn't work, thereby allowing a state store to purge itself?  And
> > maybe
> > > > more importantly, is there a way to achieve a similar behavior with a
> > 3rd
> > > > party store?  I'm not sure if hooking into the uncaught exception
> > handler
> > > > might be a good way to purge/drop a state store in the event of a
> fatal
> > > > error?  I did setup a MongoDB state store recently as part of a POC
> and
> > > was
> > > > testing it with EOS enabled.  (forcing crashes to occur and checking
> > that
> > > > the result of my aggregation was still accurate)  I was unable to
> cause
> > > > inconsistent data in the mongo store (which is good!), though of
> > course I
> > > > may just have been getting lucky.  Thanks again for your help,
> > > >
> > > > Alex
> > > >
> > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <pdeole2...@gmail.com>
> > > wrote:
> > > >
> > > >> Bruno,
> > > >>
> > > >> i tried to explain this in 'kafka user's language through above
> > > mentioned
> > > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > > >>
> > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <pdeole2...@gmail.com
> >
> > > >> wrote:
> > > >>
> > > >>> This is what I understand could be the issue with external state
> > store:
> > > >>>
> > > >>> kafka stream application consumes source topic, does processing,
> > stores
> > > >>> state to kafka state store (this is backed by topic) and before
> > > producing
> > > >>> event on destination topic, the application fails with some issue.
> In
> > > >>> this case, the transaction has failed, so kafka guarantees either
> all
> > > or
> > > >>> none, means offset written to source topic, state written to state
> > > store
> > > >>> topic, output produced on destination topic... all of these happen
> or
> > > >> none
> > > >>> of these and in this failure scenario it is none of these.
> > > >>>
> > > >>> Assume you have redis state store, and you updated the state into
> > redis
> > > >>> and stream application failed. Now, you have source topic and
> > > destination
> > > >>> topic consistent i.e. offset is not committed to source topic and
> > > output
> > > >>> not produced on destination topic, but you redis state store is
> > > >>> inconsistent with that since it is external state store and kafka
> > can't
> > > >>> guarantee rollback ot state written there
> > > >>>
> > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <alexcrai...@gmail.com>
> > > >> wrote:
> > > >>>
> > > >>>> " Another issue with 3rd party state stores could be violation of
> > > >>>> exactly-once guarantee provided by kafka streams in the event of a
> > > >> failure
> > > >>>> of streams application instance"
> > > >>>>
> > > >>>> I've heard this before but would love to know more about how a
> > custom
> > > >>>> state
> > > >>>> store would be at any greater risk than RocksDB as far as
> > exactly-once
> > > >>>> guarantees are concerned.  They all implement the same interface,
> so
> > > as
> > > >>>> long as you're correctly implementing get(), put(), delete(),
> > flush(),
> > > >>>> etc,
> > > >>>> you should be fine right?  In other words, I don't think there is
> > any
> > > >>>> special "exactly once magic" that is baked into the RocksDB store
> > > >> code.  I
> > > >>>> could be wrong though so I'd love to hear people's thoughts,
> thanks,
> > > >>>>
> > > >>>> Alex C
> > > >>>>
> > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > mpart...@hpe.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Thanks for the responses. In the worst case, I might have to keep
> > > both
> > > >>>>> rocksdb for local store and keep an external store like Redis.
> > > >>>>>
> > > >>>>> -mohan
> > > >>>>>
> > > >>>>>
> > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com>
> > wrote:
> > > >>>>>
> > > >>>>>      Another issue with 3rd party state stores could be violation
> > of
> > > >>>>>      exactly-once guarantee provided by kafka streams in the
> event
> > > of a
> > > >>>>> failure
> > > >>>>>      of streams application instance.
> > > >>>>>      Kafka provides exactly once guarantee for consumer ->
> process
> > ->
> > > >>>>> produce
> > > >>>>>      through transactions and with the use of state store like
> > redis,
> > > >>>> this
> > > >>>>>      guarantee is weaker
> > > >>>>>
> > > >>>>>      On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <
> > > wangg...@gmail.com
> > > >>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>      > Hello Mohan,
> > > >>>>>      >
> > > >>>>>      > I think what you had in mind works with Redis, since it
> is a
> > > >>>> remote
> > > >>>>> state
> > > >>>>>      > store engine, it does not have the co-partitioning
> > > requirements
> > > >> as
> > > >>>>> local
> > > >>>>>      > state stores.
> > > >>>>>      >
> > > >>>>>      > One thing you'd need to tune KS though is that with remote
> > > >> stores,
> > > >>>>> the
> > > >>>>>      > processing latency may be larger, and since Kafka Streams
> > > >> process
> > > >>>> all
> > > >>>>>      > records of a single partition in order, synchronously, you
> > may
> > > >>>> need
> > > >>>>> to tune
> > > >>>>>      > the poll interval configs etc to make sure KS would stay
> in
> > > the
> > > >>>>> consumer
> > > >>>>>      > group and not trigger unnecessary rebalances.
> > > >>>>>      >
> > > >>>>>      > Guozhang
> > > >>>>>      >
> > > >>>>>      > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> > > >>>>> mpart...@hpe.com>
> > > >>>>>      > wrote:
> > > >>>>>      >
> > > >>>>>      > > Hi,
> > > >>>>>      > >
> > > >>>>>      > > I have a use case where messages come in with some key
> > gets
> > > >>>>> assigned some
> > > >>>>>      > > partition and the state gets created. Later, key changes
> > > (but
> > > >>>> still
> > > >>>>>      > > contains the old key in the message) and gets sent to a
> > > >>>> different
> > > >>>>>      > > partition. I want to be able to grab the old state using
> > the
> > > >> old
> > > >>>>> key
> > > >>>>>      > before
> > > >>>>>      > > creating the new state on this instance. Redis as a
> state
> > > >> store
> > > >>>>> makes it
> > > >>>>>      > > easy to implement this where I can simply do a lookup
> > before
> > > >>>>> creating the
> > > >>>>>      > > state. I see an implementation here :
> > > >>>>>      > >
> > > >>>>>      >
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > >>>>>      > >
> > > >>>>>      > > Has anyone tried this ? Any caveats.
> > > >>>>>      > >
> > > >>>>>      > > Thanks
> > > >>>>>      > > Mohan
> > > >>>>>      > >
> > > >>>>>      > >
> > > >>>>>      >
> > > >>>>>      > --
> > > >>>>>      > -- Guozhang
> > > >>>>>      >
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> >
>

Reply via email to