Yep, that fell off my radar.  Here we go:
https://issues.apache.org/jira/browse/KAFKA-12475

On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hey Sophie,
>
> Maybe we can first create a JIRA ticket for this?
>
> On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman
> <sop...@confluent.io.invalid> wrote:
>
> > Sounds good! I meant anyone who is interested :)
> >
> > Let me know if you have any questions after digging in to this
> >
> > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig <alexcrai...@gmail.com>
> wrote:
> >
> > >  Hey Sophie, not sure if you meant me or not but I'd be happy to take a
> > > stab at creating a KIP for this.  I want to spend some time digging
> into
> > > more of how this works first, but will then try to gather my thoughts
> and
> > > get something created.
> > >
> > > Alex
> > >
> > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
> > > <sop...@confluent.io.invalid> wrote:
> > >
> > > > This certainly does seem like a flaw in the Streams API, although of
> > > course
> > > > Streams is just
> > > > in general not designed for use with remote anything (API calls,
> > stores,
> > > > etc)
> > > >
> > > > That said, I don't see any reason why we *couldn't* have better
> support
> > > for
> > > > remote state stores.
> > > > Note that there's currently no deleteAll method on the store
> interface,
> > > and
> > > > not all databases
> > > > necessarily support that. But we could add a default implementation
> > which
> > > > just calls delete(key)
> > > > on all keys in the state store, and for the RocksDB-based state
> stores
> > we
> > > > still wipe out the state
> > > > as usual (and recommend the same for any custom StateStore which is
> > local
> > > > rather than remote).
> > > > Obviously falling back on individual delete(key) operations for all
> the
> > > > data in the entire store will
> > > > have worse performance, but that's better than silently breaking EOS
> > when
> > > > deleteAll is not available
> > > > on a remote store.
> > > >
> > > > Would you be interested in doing a KIP for this? We should also file
> a
> > > JIRA
> > > > with the above explanation,
> > > > so that other users of remote storage are aware of this limitation.
> And
> > > > definitely document this somewhere
> > > >
> > > >
> > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> > <br...@confluent.io.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hi Alex,
> > > > >
> > > > > I guess wiping out the state directory is easier code-wise, faster,
> > > > > and/or at the time of development the developers did not design for
> > > > > remote state stores. But I do actually not know the exact reason.
> > > > >
> > > > > Off the top of my head, I do not know how to solve this for remote
> > > state
> > > > > stores. Using the uncaught exception handler is not good, because a
> > > > > computing node could fail without giving the JVM the opportunity to
> > > > > throw an exception.
> > > > >
> > > > > In your tests, try to increase the commit interval to a high value
> > and
> > > > > see if you get inconsistencies. You should get an inconsistency if
> > the
> > > > > state store maintains counts for keys and after the last commit
> > before
> > > > > the failure, the Streams app puts an event with a new key K with
> > value
> > > 1
> > > > > into the state store. After failover, Streams would put the same
> > event
> > > > > with key K again into the state store. If the state store deleted
> all
> > > of
> > > > > its data, Streams would put again value 1, but if the state store
> did
> > > > > not delete all data, Streams would put value 2 which is wrong
> because
> > > it
> > > > > would count the same event twice.
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > >
> > > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > > Bruno,
> > > > > > Thanks for the info!  that makes sense.  Of course now I have
> more
> > > > > > questions.  :)  Do you know why this is being done outside of the
> > > state
> > > > > > store API?  I assume there are reasons why a "deleteAll()" type
> of
> > > > > function
> > > > > > wouldn't work, thereby allowing a state store to purge itself?
> And
> > > > maybe
> > > > > > more importantly, is there a way to achieve a similar behavior
> > with a
> > > > 3rd
> > > > > > party store?  I'm not sure if hooking into the uncaught exception
> > > > handler
> > > > > > might be a good way to purge/drop a state store in the event of a
> > > fatal
> > > > > > error?  I did setup a MongoDB state store recently as part of a
> POC
> > > and
> > > > > was
> > > > > > testing it with EOS enabled.  (forcing crashes to occur and
> > checking
> > > > that
> > > > > > the result of my aggregation was still accurate)  I was unable to
> > > cause
> > > > > > inconsistent data in the mongo store (which is good!), though of
> > > > course I
> > > > > > may just have been getting lucky.  Thanks again for your help,
> > > > > >
> > > > > > Alex
> > > > > >
> > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> > pdeole2...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Bruno,
> > > > > >>
> > > > > >> i tried to explain this in 'kafka user's language through above
> > > > > mentioned
> > > > > >> scenario, hope i put it properly -:) and correct me if i am
> wrong
> > > > > >>
> > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> > pdeole2...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> This is what I understand could be the issue with external
> state
> > > > store:
> > > > > >>>
> > > > > >>> kafka stream application consumes source topic, does
> processing,
> > > > stores
> > > > > >>> state to kafka state store (this is backed by topic) and before
> > > > > producing
> > > > > >>> event on destination topic, the application fails with some
> > issue.
> > > In
> > > > > >>> this case, the transaction has failed, so kafka guarantees
> either
> > > all
> > > > > or
> > > > > >>> none, means offset written to source topic, state written to
> > state
> > > > > store
> > > > > >>> topic, output produced on destination topic... all of these
> > happen
> > > or
> > > > > >> none
> > > > > >>> of these and in this failure scenario it is none of these.
> > > > > >>>
> > > > > >>> Assume you have redis state store, and you updated the state
> into
> > > > redis
> > > > > >>> and stream application failed. Now, you have source topic and
> > > > > destination
> > > > > >>> topic consistent i.e. offset is not committed to source topic
> and
> > > > > output
> > > > > >>> not produced on destination topic, but you redis state store is
> > > > > >>> inconsistent with that since it is external state store and
> kafka
> > > > can't
> > > > > >>> guarantee rollback ot state written there
> > > > > >>>
> > > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <
> > alexcrai...@gmail.com>
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> " Another issue with 3rd party state stores could be violation
> > of
> > > > > >>>> exactly-once guarantee provided by kafka streams in the event
> > of a
> > > > > >> failure
> > > > > >>>> of streams application instance"
> > > > > >>>>
> > > > > >>>> I've heard this before but would love to know more about how a
> > > > custom
> > > > > >>>> state
> > > > > >>>> store would be at any greater risk than RocksDB as far as
> > > > exactly-once
> > > > > >>>> guarantees are concerned.  They all implement the same
> > interface,
> > > so
> > > > > as
> > > > > >>>> long as you're correctly implementing get(), put(), delete(),
> > > > flush(),
> > > > > >>>> etc,
> > > > > >>>> you should be fine right?  In other words, I don't think there
> > is
> > > > any
> > > > > >>>> special "exactly once magic" that is baked into the RocksDB
> > store
> > > > > >> code.  I
> > > > > >>>> could be wrong though so I'd love to hear people's thoughts,
> > > thanks,
> > > > > >>>>
> > > > > >>>> Alex C
> > > > > >>>>
> > > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > > > mpart...@hpe.com>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>>> Thanks for the responses. In the worst case, I might have to
> > keep
> > > > > both
> > > > > >>>>> rocksdb for local store and keep an external store like
> Redis.
> > > > > >>>>>
> > > > > >>>>> -mohan
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com>
> > > > wrote:
> > > > > >>>>>
> > > > > >>>>>      Another issue with 3rd party state stores could be
> > violation
> > > > of
> > > > > >>>>>      exactly-once guarantee provided by kafka streams in the
> > > event
> > > > > of a
> > > > > >>>>> failure
> > > > > >>>>>      of streams application instance.
> > > > > >>>>>      Kafka provides exactly once guarantee for consumer ->
> > > process
> > > > ->
> > > > > >>>>> produce
> > > > > >>>>>      through transactions and with the use of state store
> like
> > > > redis,
> > > > > >>>> this
> > > > > >>>>>      guarantee is weaker
> > > > > >>>>>
> > > > > >>>>>      On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <
> > > > > wangg...@gmail.com
> > > > > >>>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>>      > Hello Mohan,
> > > > > >>>>>      >
> > > > > >>>>>      > I think what you had in mind works with Redis, since
> it
> > > is a
> > > > > >>>> remote
> > > > > >>>>> state
> > > > > >>>>>      > store engine, it does not have the co-partitioning
> > > > > requirements
> > > > > >> as
> > > > > >>>>> local
> > > > > >>>>>      > state stores.
> > > > > >>>>>      >
> > > > > >>>>>      > One thing you'd need to tune KS though is that with
> > remote
> > > > > >> stores,
> > > > > >>>>> the
> > > > > >>>>>      > processing latency may be larger, and since Kafka
> > Streams
> > > > > >> process
> > > > > >>>> all
> > > > > >>>>>      > records of a single partition in order, synchronously,
> > you
> > > > may
> > > > > >>>> need
> > > > > >>>>> to tune
> > > > > >>>>>      > the poll interval configs etc to make sure KS would
> stay
> > > in
> > > > > the
> > > > > >>>>> consumer
> > > > > >>>>>      > group and not trigger unnecessary rebalances.
> > > > > >>>>>      >
> > > > > >>>>>      > Guozhang
> > > > > >>>>>      >
> > > > > >>>>>      > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> > > > > >>>>> mpart...@hpe.com>
> > > > > >>>>>      > wrote:
> > > > > >>>>>      >
> > > > > >>>>>      > > Hi,
> > > > > >>>>>      > >
> > > > > >>>>>      > > I have a use case where messages come in with some
> key
> > > > gets
> > > > > >>>>> assigned some
> > > > > >>>>>      > > partition and the state gets created. Later, key
> > changes
> > > > > (but
> > > > > >>>> still
> > > > > >>>>>      > > contains the old key in the message) and gets sent
> to
> > a
> > > > > >>>> different
> > > > > >>>>>      > > partition. I want to be able to grab the old state
> > using
> > > > the
> > > > > >> old
> > > > > >>>>> key
> > > > > >>>>>      > before
> > > > > >>>>>      > > creating the new state on this instance. Redis as a
> > > state
> > > > > >> store
> > > > > >>>>> makes it
> > > > > >>>>>      > > easy to implement this where I can simply do a
> lookup
> > > > before
> > > > > >>>>> creating the
> > > > > >>>>>      > > state. I see an implementation here :
> > > > > >>>>>      > >
> > > > > >>>>>      >
> > > > > >>>>>
> > > > > >>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > > > >>>>>      > >
> > > > > >>>>>      > > Has anyone tried this ? Any caveats.
> > > > > >>>>>      > >
> > > > > >>>>>      > > Thanks
> > > > > >>>>>      > > Mohan
> > > > > >>>>>      > >
> > > > > >>>>>      > >
> > > > > >>>>>      >
> > > > > >>>>>      > --
> > > > > >>>>>      > -- Guozhang
> > > > > >>>>>      >
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to