Thanks!

On Mon, Mar 15, 2021 at 8:38 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> Yep, that fell off my radar.  Here we go:
> https://issues.apache.org/jira/browse/KAFKA-12475
>
> On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hey Sophie,
> >
> > Maybe we can first create a JIRA ticket for this?
> >
> > On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > Sounds good! I meant anyone who is interested :)
> > >
> > > Let me know if you have any questions after digging in to this
> > >
> > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig <alexcrai...@gmail.com>
> > wrote:
> > >
> > > >  Hey Sophie, not sure if you meant me or not but I'd be happy to
> take a
> > > > stab at creating a KIP for this.  I want to spend some time digging
> > into
> > > > more of how this works first, but will then try to gather my thoughts
> > and
> > > > get something created.
> > > >
> > > > Alex
> > > >
> > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
> > > > <sop...@confluent.io.invalid> wrote:
> > > >
> > > > > This certainly does seem like a flaw in the Streams API, although
> of
> > > > course
> > > > > Streams is just
> > > > > in general not designed for use with remote anything (API calls,
> > > stores,
> > > > > etc)
> > > > >
> > > > > That said, I don't see any reason why we *couldn't* have better
> > support
> > > > for
> > > > > remote state stores.
> > > > > Note that there's currently no deleteAll method on the store
> > interface,
> > > > and
> > > > > not all databases
> > > > > necessarily support that. But we could add a default implementation
> > > which
> > > > > just calls delete(key)
> > > > > on all keys in the state store, and for the RocksDB-based state
> > stores
> > > we
> > > > > still wipe out the state
> > > > > as usual (and recommend the same for any custom StateStore which is
> > > local
> > > > > rather than remote).
> > > > > Obviously falling back on individual delete(key) operations for all
> > the
> > > > > data in the entire store will
> > > > > have worse performance, but that's better than silently breaking
> EOS
> > > when
> > > > > deleteAll is not available
> > > > > on a remote store.
> > > > >
> > > > > Would you be interested in doing a KIP for this? We should also
> file
> > a
> > > > JIRA
> > > > > with the above explanation,
> > > > > so that other users of remote storage are aware of this limitation.
> > And
> > > > > definitely document this somewhere
> > > > >
> > > > >
> > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> > > <br...@confluent.io.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Alex,
> > > > > >
> > > > > > I guess wiping out the state directory is easier code-wise,
> faster,
> > > > > > and/or at the time of development the developers did not design
> for
> > > > > > remote state stores. But I do actually not know the exact reason.
> > > > > >
> > > > > > Off the top of my head, I do not know how to solve this for
> remote
> > > > state
> > > > > > stores. Using the uncaught exception handler is not good,
> because a
> > > > > > computing node could fail without giving the JVM the opportunity
> to
> > > > > > throw an exception.
> > > > > >
> > > > > > In your tests, try to increase the commit interval to a high
> value
> > > and
> > > > > > see if you get inconsistencies. You should get an inconsistency
> if
> > > the
> > > > > > state store maintains counts for keys and after the last commit
> > > before
> > > > > > the failure, the Streams app puts an event with a new key K with
> > > value
> > > > 1
> > > > > > into the state store. After failover, Streams would put the same
> > > event
> > > > > > with key K again into the state store. If the state store deleted
> > all
> > > > of
> > > > > > its data, Streams would put again value 1, but if the state store
> > did
> > > > > > not delete all data, Streams would put value 2 which is wrong
> > because
> > > > it
> > > > > > would count the same event twice.
> > > > > >
> > > > > > Best,
> > > > > > Bruno
> > > > > >
> > > > > >
> > > > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > > > Bruno,
> > > > > > > Thanks for the info!  that makes sense.  Of course now I have
> > more
> > > > > > > questions.  :)  Do you know why this is being done outside of
> the
> > > > state
> > > > > > > store API?  I assume there are reasons why a "deleteAll()" type
> > of
> > > > > > function
> > > > > > > wouldn't work, thereby allowing a state store to purge itself?
> > And
> > > > > maybe
> > > > > > > more importantly, is there a way to achieve a similar behavior
> > > with a
> > > > > 3rd
> > > > > > > party store?  I'm not sure if hooking into the uncaught
> exception
> > > > > handler
> > > > > > > might be a good way to purge/drop a state store in the event
> of a
> > > > fatal
> > > > > > > error?  I did setup a MongoDB state store recently as part of a
> > POC
> > > > and
> > > > > > was
> > > > > > > testing it with EOS enabled.  (forcing crashes to occur and
> > > checking
> > > > > that
> > > > > > > the result of my aggregation was still accurate)  I was unable
> to
> > > > cause
> > > > > > > inconsistent data in the mongo store (which is good!), though
> of
> > > > > course I
> > > > > > > may just have been getting lucky.  Thanks again for your help,
> > > > > > >
> > > > > > > Alex
> > > > > > >
> > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> > > pdeole2...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > >> Bruno,
> > > > > > >>
> > > > > > >> i tried to explain this in 'kafka user's language through
> above
> > > > > > mentioned
> > > > > > >> scenario, hope i put it properly -:) and correct me if i am
> > wrong
> > > > > > >>
> > > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> > > pdeole2...@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> This is what I understand could be the issue with external
> > state
> > > > > store:
> > > > > > >>>
> > > > > > >>> kafka stream application consumes source topic, does
> > processing,
> > > > > stores
> > > > > > >>> state to kafka state store (this is backed by topic) and
> before
> > > > > > producing
> > > > > > >>> event on destination topic, the application fails with some
> > > issue.
> > > > In
> > > > > > >>> this case, the transaction has failed, so kafka guarantees
> > either
> > > > all
> > > > > > or
> > > > > > >>> none, means offset written to source topic, state written to
> > > state
> > > > > > store
> > > > > > >>> topic, output produced on destination topic... all of these
> > > happen
> > > > or
> > > > > > >> none
> > > > > > >>> of these and in this failure scenario it is none of these.
> > > > > > >>>
> > > > > > >>> Assume you have redis state store, and you updated the state
> > into
> > > > > redis
> > > > > > >>> and stream application failed. Now, you have source topic and
> > > > > > destination
> > > > > > >>> topic consistent i.e. offset is not committed to source topic
> > and
> > > > > > output
> > > > > > >>> not produced on destination topic, but you redis state store
> is
> > > > > > >>> inconsistent with that since it is external state store and
> > kafka
> > > > > can't
> > > > > > >>> guarantee rollback ot state written there
> > > > > > >>>
> > > > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <
> > > alexcrai...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>>
> > > > > > >>>> " Another issue with 3rd party state stores could be
> violation
> > > of
> > > > > > >>>> exactly-once guarantee provided by kafka streams in the
> event
> > > of a
> > > > > > >> failure
> > > > > > >>>> of streams application instance"
> > > > > > >>>>
> > > > > > >>>> I've heard this before but would love to know more about
> how a
> > > > > custom
> > > > > > >>>> state
> > > > > > >>>> store would be at any greater risk than RocksDB as far as
> > > > > exactly-once
> > > > > > >>>> guarantees are concerned.  They all implement the same
> > > interface,
> > > > so
> > > > > > as
> > > > > > >>>> long as you're correctly implementing get(), put(),
> delete(),
> > > > > flush(),
> > > > > > >>>> etc,
> > > > > > >>>> you should be fine right?  In other words, I don't think
> there
> > > is
> > > > > any
> > > > > > >>>> special "exactly once magic" that is baked into the RocksDB
> > > store
> > > > > > >> code.  I
> > > > > > >>>> could be wrong though so I'd love to hear people's thoughts,
> > > > thanks,
> > > > > > >>>>
> > > > > > >>>> Alex C
> > > > > > >>>>
> > > > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > > > > mpart...@hpe.com>
> > > > > > >>>> wrote:
> > > > > > >>>>
> > > > > > >>>>> Thanks for the responses. In the worst case, I might have
> to
> > > keep
> > > > > > both
> > > > > > >>>>> rocksdb for local store and keep an external store like
> > Redis.
> > > > > > >>>>>
> > > > > > >>>>> -mohan
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <
> pdeole2...@gmail.com>
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>      Another issue with 3rd party state stores could be
> > > violation
> > > > > of
> > > > > > >>>>>      exactly-once guarantee provided by kafka streams in
> the
> > > > event
> > > > > > of a
> > > > > > >>>>> failure
> > > > > > >>>>>      of streams application instance.
> > > > > > >>>>>      Kafka provides exactly once guarantee for consumer ->
> > > > process
> > > > > ->
> > > > > > >>>>> produce
> > > > > > >>>>>      through transactions and with the use of state store
> > like
> > > > > redis,
> > > > > > >>>> this
> > > > > > >>>>>      guarantee is weaker
> > > > > > >>>>>
> > > > > > >>>>>      On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <
> > > > > > wangg...@gmail.com
> > > > > > >>>
> > > > > > >>>>> wrote:
> > > > > > >>>>>
> > > > > > >>>>>      > Hello Mohan,
> > > > > > >>>>>      >
> > > > > > >>>>>      > I think what you had in mind works with Redis, since
> > it
> > > > is a
> > > > > > >>>> remote
> > > > > > >>>>> state
> > > > > > >>>>>      > store engine, it does not have the co-partitioning
> > > > > > requirements
> > > > > > >> as
> > > > > > >>>>> local
> > > > > > >>>>>      > state stores.
> > > > > > >>>>>      >
> > > > > > >>>>>      > One thing you'd need to tune KS though is that with
> > > remote
> > > > > > >> stores,
> > > > > > >>>>> the
> > > > > > >>>>>      > processing latency may be larger, and since Kafka
> > > Streams
> > > > > > >> process
> > > > > > >>>> all
> > > > > > >>>>>      > records of a single partition in order,
> synchronously,
> > > you
> > > > > may
> > > > > > >>>> need
> > > > > > >>>>> to tune
> > > > > > >>>>>      > the poll interval configs etc to make sure KS would
> > stay
> > > > in
> > > > > > the
> > > > > > >>>>> consumer
> > > > > > >>>>>      > group and not trigger unnecessary rebalances.
> > > > > > >>>>>      >
> > > > > > >>>>>      > Guozhang
> > > > > > >>>>>      >
> > > > > > >>>>>      > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy,
> Mohan <
> > > > > > >>>>> mpart...@hpe.com>
> > > > > > >>>>>      > wrote:
> > > > > > >>>>>      >
> > > > > > >>>>>      > > Hi,
> > > > > > >>>>>      > >
> > > > > > >>>>>      > > I have a use case where messages come in with some
> > key
> > > > > gets
> > > > > > >>>>> assigned some
> > > > > > >>>>>      > > partition and the state gets created. Later, key
> > > changes
> > > > > > (but
> > > > > > >>>> still
> > > > > > >>>>>      > > contains the old key in the message) and gets sent
> > to
> > > a
> > > > > > >>>> different
> > > > > > >>>>>      > > partition. I want to be able to grab the old state
> > > using
> > > > > the
> > > > > > >> old
> > > > > > >>>>> key
> > > > > > >>>>>      > before
> > > > > > >>>>>      > > creating the new state on this instance. Redis as
> a
> > > > state
> > > > > > >> store
> > > > > > >>>>> makes it
> > > > > > >>>>>      > > easy to implement this where I can simply do a
> > lookup
> > > > > before
> > > > > > >>>>> creating the
> > > > > > >>>>>      > > state. I see an implementation here :
> > > > > > >>>>>      > >
> > > > > > >>>>>      >
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > > > > >>>>>      > >
> > > > > > >>>>>      > > Has anyone tried this ? Any caveats.
> > > > > > >>>>>      > >
> > > > > > >>>>>      > > Thanks
> > > > > > >>>>>      > > Mohan
> > > > > > >>>>>      > >
> > > > > > >>>>>      > >
> > > > > > >>>>>      >
> > > > > > >>>>>      > --
> > > > > > >>>>>      > -- Guozhang
> > > > > > >>>>>      >
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to