Thanks Sophie... that answers my question, however still worried about some
other aspects:

1. If redis is to be restored from changelog topic: what would happen if i
have 3 stream applications and 1 instance went down ... will other 2
instances halt until entire existing state from redis is wiped out and
entire state is restored back from changelog topic? If so then it would
have a significant performance hit especially when this happens during
heavy traffic hours

2. Will #1 be solved by the 2nd alternative that you mentioned in the
comment i.e 'An alternative is to just start buffering updates in-memory
(or in rocksdb, this could be configurable) and then avoid dirtying the
remote storage in the first place as we would only flush the data out to it
during a commit'  It looks to me that this won't need rebuilding entire
state store because changelog is disabled, and this alternative would avoid
making the state store inconsistent in first place, thus saving wipe out
and rebuild ? If so then this also doesn't need to halt other stream
applications and would prove much more better approach from performance
point of view. Is that correct?

On Sat, Mar 20, 2021 at 2:25 AM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> Hey Pushkar, yes, the data will still be backed by a changelog topic unless
> the
> user explicitly disables logging for that state store. The fault tolerance
> mechanism
> of Kafka Streams is based on changelogging, therefore there are no
> correctness
> guarantees if you decide to disable it.
>
> That said, I'm guessing many users do in fact disable the changelog when
> plugging
> in a remote store with it's own fault tolerance guarantees -- is that what
> you're getting
> at? We could definitely build in better support for that case, as either an
> additional
> optimization on top of KAFKA-12475
> <https://issues.apache.org/jira/browse/KAFKA-12475> or as an alternative
> implementation to fix the
> underlying EOS problem. Check out my latest comment on the ticket here
> <
> https://issues.apache.org/jira/browse/KAFKA-12475?focusedCommentId=17305191&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305191
> >
>
> Does that address your question?
>
> On Fri, Mar 19, 2021 at 10:14 AM Pushkar Deole <pdeole2...@gmail.com>
> wrote:
>
> > Hello Sophie,
> >
> > may be i am missing something here, however can you let me know how a
> redis
> > based state store will be wiped off the inconsistent state in case stream
> > application dies in the middle of processing e.g. stream application
> > consumed from source topic, processed source event and saved state to
> redis
> > and before producing event on destination topic, the stream application
> had
> > error.
> > If this occurs with a rocksDB or in-memory state store, it will be
> rebuilt
> > from changelog topic, however for redis state store, how it will wiped
> off
> > the state ? are we saying here that the data stored in redis will still
> be
> > backed by changelog topic and redis will be restored from backed topic in
> > case of stream application error?
> >
> > On Tue, Mar 16, 2021 at 12:18 AM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > This certainly does seem like a flaw in the Streams API, although of
> > course
> > > Streams is just
> > > in general not designed for use with remote anything (API calls,
> stores,
> > > etc)
> > >
> > > That said, I don't see any reason why we *couldn't* have better support
> > for
> > > remote state stores.
> > > Note that there's currently no deleteAll method on the store interface,
> > and
> > > not all databases
> > > necessarily support that. But we could add a default implementation
> which
> > > just calls delete(key)
> > > on all keys in the state store, and for the RocksDB-based state stores
> we
> > > still wipe out the state
> > > as usual (and recommend the same for any custom StateStore which is
> local
> > > rather than remote).
> > > Obviously falling back on individual delete(key) operations for all the
> > > data in the entire store will
> > > have worse performance, but that's better than silently breaking EOS
> when
> > > deleteAll is not available
> > > on a remote store.
> > >
> > > Would you be interested in doing a KIP for this? We should also file a
> > JIRA
> > > with the above explanation,
> > > so that other users of remote storage are aware of this limitation. And
> > > definitely document this somewhere
> > >
> > >
> > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> <br...@confluent.io.invalid
> > >
> > > wrote:
> > >
> > > > Hi Alex,
> > > >
> > > > I guess wiping out the state directory is easier code-wise, faster,
> > > > and/or at the time of development the developers did not design for
> > > > remote state stores. But I do actually not know the exact reason.
> > > >
> > > > Off the top of my head, I do not know how to solve this for remote
> > state
> > > > stores. Using the uncaught exception handler is not good, because a
> > > > computing node could fail without giving the JVM the opportunity to
> > > > throw an exception.
> > > >
> > > > In your tests, try to increase the commit interval to a high value
> and
> > > > see if you get inconsistencies. You should get an inconsistency if
> the
> > > > state store maintains counts for keys and after the last commit
> before
> > > > the failure, the Streams app puts an event with a new key K with
> value
> > 1
> > > > into the state store. After failover, Streams would put the same
> event
> > > > with key K again into the state store. If the state store deleted all
> > of
> > > > its data, Streams would put again value 1, but if the state store did
> > > > not delete all data, Streams would put value 2 which is wrong because
> > it
> > > > would count the same event twice.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > >
> > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > Bruno,
> > > > > Thanks for the info!  that makes sense.  Of course now I have more
> > > > > questions.  :)  Do you know why this is being done outside of the
> > state
> > > > > store API?  I assume there are reasons why a "deleteAll()" type of
> > > > function
> > > > > wouldn't work, thereby allowing a state store to purge itself?  And
> > > maybe
> > > > > more importantly, is there a way to achieve a similar behavior
> with a
> > > 3rd
> > > > > party store?  I'm not sure if hooking into the uncaught exception
> > > handler
> > > > > might be a good way to purge/drop a state store in the event of a
> > fatal
> > > > > error?  I did setup a MongoDB state store recently as part of a POC
> > and
> > > > was
> > > > > testing it with EOS enabled.  (forcing crashes to occur and
> checking
> > > that
> > > > > the result of my aggregation was still accurate)  I was unable to
> > cause
> > > > > inconsistent data in the mongo store (which is good!), though of
> > > course I
> > > > > may just have been getting lucky.  Thanks again for your help,
> > > > >
> > > > > Alex
> > > > >
> > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> pdeole2...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Bruno,
> > > > >>
> > > > >> i tried to explain this in 'kafka user's language through above
> > > > mentioned
> > > > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > > > >>
> > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> pdeole2...@gmail.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >>> This is what I understand could be the issue with external state
> > > store:
> > > > >>>
> > > > >>> kafka stream application consumes source topic, does processing,
> > > stores
> > > > >>> state to kafka state store (this is backed by topic) and before
> > > > producing
> > > > >>> event on destination topic, the application fails with some
> issue.
> > In
> > > > >>> this case, the transaction has failed, so kafka guarantees either
> > all
> > > > or
> > > > >>> none, means offset written to source topic, state written to
> state
> > > > store
> > > > >>> topic, output produced on destination topic... all of these
> happen
> > or
> > > > >> none
> > > > >>> of these and in this failure scenario it is none of these.
> > > > >>>
> > > > >>> Assume you have redis state store, and you updated the state into
> > > redis
> > > > >>> and stream application failed. Now, you have source topic and
> > > > destination
> > > > >>> topic consistent i.e. offset is not committed to source topic and
> > > > output
> > > > >>> not produced on destination topic, but you redis state store is
> > > > >>> inconsistent with that since it is external state store and kafka
> > > can't
> > > > >>> guarantee rollback ot state written there
> > > > >>>
> > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <
> alexcrai...@gmail.com>
> > > > >> wrote:
> > > > >>>
> > > > >>>> " Another issue with 3rd party state stores could be violation
> of
> > > > >>>> exactly-once guarantee provided by kafka streams in the event
> of a
> > > > >> failure
> > > > >>>> of streams application instance"
> > > > >>>>
> > > > >>>> I've heard this before but would love to know more about how a
> > > custom
> > > > >>>> state
> > > > >>>> store would be at any greater risk than RocksDB as far as
> > > exactly-once
> > > > >>>> guarantees are concerned.  They all implement the same
> interface,
> > so
> > > > as
> > > > >>>> long as you're correctly implementing get(), put(), delete(),
> > > flush(),
> > > > >>>> etc,
> > > > >>>> you should be fine right?  In other words, I don't think there
> is
> > > any
> > > > >>>> special "exactly once magic" that is baked into the RocksDB
> store
> > > > >> code.  I
> > > > >>>> could be wrong though so I'd love to hear people's thoughts,
> > thanks,
> > > > >>>>
> > > > >>>> Alex C
> > > > >>>>
> > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > > mpart...@hpe.com>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Thanks for the responses. In the worst case, I might have to
> keep
> > > > both
> > > > >>>>> rocksdb for local store and keep an external store like Redis.
> > > > >>>>>
> > > > >>>>> -mohan
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com>
> > > wrote:
> > > > >>>>>
> > > > >>>>>      Another issue with 3rd party state stores could be
> violation
> > > of
> > > > >>>>>      exactly-once guarantee provided by kafka streams in the
> > event
> > > > of a
> > > > >>>>> failure
> > > > >>>>>      of streams application instance.
> > > > >>>>>      Kafka provides exactly once guarantee for consumer ->
> > process
> > > ->
> > > > >>>>> produce
> > > > >>>>>      through transactions and with the use of state store like
> > > redis,
> > > > >>>> this
> > > > >>>>>      guarantee is weaker
> > > > >>>>>
> > > > >>>>>      On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <
> > > > wangg...@gmail.com
> > > > >>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>      > Hello Mohan,
> > > > >>>>>      >
> > > > >>>>>      > I think what you had in mind works with Redis, since it
> > is a
> > > > >>>> remote
> > > > >>>>> state
> > > > >>>>>      > store engine, it does not have the co-partitioning
> > > > requirements
> > > > >> as
> > > > >>>>> local
> > > > >>>>>      > state stores.
> > > > >>>>>      >
> > > > >>>>>      > One thing you'd need to tune KS though is that with
> remote
> > > > >> stores,
> > > > >>>>> the
> > > > >>>>>      > processing latency may be larger, and since Kafka
> Streams
> > > > >> process
> > > > >>>> all
> > > > >>>>>      > records of a single partition in order, synchronously,
> you
> > > may
> > > > >>>> need
> > > > >>>>> to tune
> > > > >>>>>      > the poll interval configs etc to make sure KS would stay
> > in
> > > > the
> > > > >>>>> consumer
> > > > >>>>>      > group and not trigger unnecessary rebalances.
> > > > >>>>>      >
> > > > >>>>>      > Guozhang
> > > > >>>>>      >
> > > > >>>>>      > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> > > > >>>>> mpart...@hpe.com>
> > > > >>>>>      > wrote:
> > > > >>>>>      >
> > > > >>>>>      > > Hi,
> > > > >>>>>      > >
> > > > >>>>>      > > I have a use case where messages come in with some key
> > > gets
> > > > >>>>> assigned some
> > > > >>>>>      > > partition and the state gets created. Later, key
> changes
> > > > (but
> > > > >>>> still
> > > > >>>>>      > > contains the old key in the message) and gets sent to
> a
> > > > >>>> different
> > > > >>>>>      > > partition. I want to be able to grab the old state
> using
> > > the
> > > > >> old
> > > > >>>>> key
> > > > >>>>>      > before
> > > > >>>>>      > > creating the new state on this instance. Redis as a
> > state
> > > > >> store
> > > > >>>>> makes it
> > > > >>>>>      > > easy to implement this where I can simply do a lookup
> > > before
> > > > >>>>> creating the
> > > > >>>>>      > > state. I see an implementation here :
> > > > >>>>>      > >
> > > > >>>>>      >
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > > >>>>>      > >
> > > > >>>>>      > > Has anyone tried this ? Any caveats.
> > > > >>>>>      > >
> > > > >>>>>      > > Thanks
> > > > >>>>>      > > Mohan
> > > > >>>>>      > >
> > > > >>>>>      > >
> > > > >>>>>      >
> > > > >>>>>      > --
> > > > >>>>>      > -- Guozhang
> > > > >>>>>      >
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to