Hey Sophie,

Maybe we can first create a JIRA ticket for this?

On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> Sounds good! I meant anyone who is interested :)
>
> Let me know if you have any questions after digging in to this
>
> On Mon, Mar 15, 2021 at 2:39 PM Alex Craig <alexcrai...@gmail.com> wrote:
>
> >  Hey Sophie, not sure if you meant me or not but I'd be happy to take a
> > stab at creating a KIP for this.  I want to spend some time digging into
> > more of how this works first, but will then try to gather my thoughts and
> > get something created.
> >
> > Alex
> >
> > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > This certainly does seem like a flaw in the Streams API, although of
> > course
> > > Streams is just
> > > in general not designed for use with remote anything (API calls,
> stores,
> > > etc)
> > >
> > > That said, I don't see any reason why we *couldn't* have better support
> > for
> > > remote state stores.
> > > Note that there's currently no deleteAll method on the store interface,
> > and
> > > not all databases
> > > necessarily support that. But we could add a default implementation
> which
> > > just calls delete(key)
> > > on all keys in the state store, and for the RocksDB-based state stores
> we
> > > still wipe out the state
> > > as usual (and recommend the same for any custom StateStore which is
> local
> > > rather than remote).
> > > Obviously falling back on individual delete(key) operations for all the
> > > data in the entire store will
> > > have worse performance, but that's better than silently breaking EOS
> when
> > > deleteAll is not available
> > > on a remote store.
> > >
> > > Would you be interested in doing a KIP for this? We should also file a
> > JIRA
> > > with the above explanation,
> > > so that other users of remote storage are aware of this limitation. And
> > > definitely document this somewhere
> > >
> > >
> > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> <br...@confluent.io.invalid
> > >
> > > wrote:
> > >
> > > > Hi Alex,
> > > >
> > > > I guess wiping out the state directory is easier code-wise, faster,
> > > > and/or at the time of development the developers did not design for
> > > > remote state stores. But I do actually not know the exact reason.
> > > >
> > > > Off the top of my head, I do not know how to solve this for remote
> > state
> > > > stores. Using the uncaught exception handler is not good, because a
> > > > computing node could fail without giving the JVM the opportunity to
> > > > throw an exception.
> > > >
> > > > In your tests, try to increase the commit interval to a high value
> and
> > > > see if you get inconsistencies. You should get an inconsistency if
> the
> > > > state store maintains counts for keys and after the last commit
> before
> > > > the failure, the Streams app puts an event with a new key K with
> value
> > 1
> > > > into the state store. After failover, Streams would put the same
> event
> > > > with key K again into the state store. If the state store deleted all
> > of
> > > > its data, Streams would put again value 1, but if the state store did
> > > > not delete all data, Streams would put value 2 which is wrong because
> > it
> > > > would count the same event twice.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > >
> > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > Bruno,
> > > > > Thanks for the info!  that makes sense.  Of course now I have more
> > > > > questions.  :)  Do you know why this is being done outside of the
> > state
> > > > > store API?  I assume there are reasons why a "deleteAll()" type of
> > > > function
> > > > > wouldn't work, thereby allowing a state store to purge itself?  And
> > > maybe
> > > > > more importantly, is there a way to achieve a similar behavior
> with a
> > > 3rd
> > > > > party store?  I'm not sure if hooking into the uncaught exception
> > > handler
> > > > > might be a good way to purge/drop a state store in the event of a
> > fatal
> > > > > error?  I did setup a MongoDB state store recently as part of a POC
> > and
> > > > was
> > > > > testing it with EOS enabled.  (forcing crashes to occur and
> checking
> > > that
> > > > > the result of my aggregation was still accurate)  I was unable to
> > cause
> > > > > inconsistent data in the mongo store (which is good!), though of
> > > course I
> > > > > may just have been getting lucky.  Thanks again for your help,
> > > > >
> > > > > Alex
> > > > >
> > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> pdeole2...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Bruno,
> > > > >>
> > > > >> i tried to explain this in 'kafka user's language through above
> > > > mentioned
> > > > >> scenario, hope i put it properly -:) and correct me if i am wrong
> > > > >>
> > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> pdeole2...@gmail.com
> > >
> > > > >> wrote:
> > > > >>
> > > > >>> This is what I understand could be the issue with external state
> > > store:
> > > > >>>
> > > > >>> kafka stream application consumes source topic, does processing,
> > > stores
> > > > >>> state to kafka state store (this is backed by topic) and before
> > > > producing
> > > > >>> event on destination topic, the application fails with some
> issue.
> > In
> > > > >>> this case, the transaction has failed, so kafka guarantees either
> > all
> > > > or
> > > > >>> none, means offset written to source topic, state written to
> state
> > > > store
> > > > >>> topic, output produced on destination topic... all of these
> happen
> > or
> > > > >> none
> > > > >>> of these and in this failure scenario it is none of these.
> > > > >>>
> > > > >>> Assume you have redis state store, and you updated the state into
> > > redis
> > > > >>> and stream application failed. Now, you have source topic and
> > > > destination
> > > > >>> topic consistent i.e. offset is not committed to source topic and
> > > > output
> > > > >>> not produced on destination topic, but you redis state store is
> > > > >>> inconsistent with that since it is external state store and kafka
> > > can't
> > > > >>> guarantee rollback ot state written there
> > > > >>>
> > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <
> alexcrai...@gmail.com>
> > > > >> wrote:
> > > > >>>
> > > > >>>> " Another issue with 3rd party state stores could be violation
> of
> > > > >>>> exactly-once guarantee provided by kafka streams in the event
> of a
> > > > >> failure
> > > > >>>> of streams application instance"
> > > > >>>>
> > > > >>>> I've heard this before but would love to know more about how a
> > > custom
> > > > >>>> state
> > > > >>>> store would be at any greater risk than RocksDB as far as
> > > exactly-once
> > > > >>>> guarantees are concerned.  They all implement the same
> interface,
> > so
> > > > as
> > > > >>>> long as you're correctly implementing get(), put(), delete(),
> > > flush(),
> > > > >>>> etc,
> > > > >>>> you should be fine right?  In other words, I don't think there
> is
> > > any
> > > > >>>> special "exactly once magic" that is baked into the RocksDB
> store
> > > > >> code.  I
> > > > >>>> could be wrong though so I'd love to hear people's thoughts,
> > thanks,
> > > > >>>>
> > > > >>>> Alex C
> > > > >>>>
> > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > > mpart...@hpe.com>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Thanks for the responses. In the worst case, I might have to
> keep
> > > > both
> > > > >>>>> rocksdb for local store and keep an external store like Redis.
> > > > >>>>>
> > > > >>>>> -mohan
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com>
> > > wrote:
> > > > >>>>>
> > > > >>>>>      Another issue with 3rd party state stores could be
> violation
> > > of
> > > > >>>>>      exactly-once guarantee provided by kafka streams in the
> > event
> > > > of a
> > > > >>>>> failure
> > > > >>>>>      of streams application instance.
> > > > >>>>>      Kafka provides exactly once guarantee for consumer ->
> > process
> > > ->
> > > > >>>>> produce
> > > > >>>>>      through transactions and with the use of state store like
> > > redis,
> > > > >>>> this
> > > > >>>>>      guarantee is weaker
> > > > >>>>>
> > > > >>>>>      On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <
> > > > wangg...@gmail.com
> > > > >>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>      > Hello Mohan,
> > > > >>>>>      >
> > > > >>>>>      > I think what you had in mind works with Redis, since it
> > is a
> > > > >>>> remote
> > > > >>>>> state
> > > > >>>>>      > store engine, it does not have the co-partitioning
> > > > requirements
> > > > >> as
> > > > >>>>> local
> > > > >>>>>      > state stores.
> > > > >>>>>      >
> > > > >>>>>      > One thing you'd need to tune KS though is that with
> remote
> > > > >> stores,
> > > > >>>>> the
> > > > >>>>>      > processing latency may be larger, and since Kafka
> Streams
> > > > >> process
> > > > >>>> all
> > > > >>>>>      > records of a single partition in order, synchronously,
> you
> > > may
> > > > >>>> need
> > > > >>>>> to tune
> > > > >>>>>      > the poll interval configs etc to make sure KS would stay
> > in
> > > > the
> > > > >>>>> consumer
> > > > >>>>>      > group and not trigger unnecessary rebalances.
> > > > >>>>>      >
> > > > >>>>>      > Guozhang
> > > > >>>>>      >
> > > > >>>>>      > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> > > > >>>>> mpart...@hpe.com>
> > > > >>>>>      > wrote:
> > > > >>>>>      >
> > > > >>>>>      > > Hi,
> > > > >>>>>      > >
> > > > >>>>>      > > I have a use case where messages come in with some key
> > > gets
> > > > >>>>> assigned some
> > > > >>>>>      > > partition and the state gets created. Later, key
> changes
> > > > (but
> > > > >>>> still
> > > > >>>>>      > > contains the old key in the message) and gets sent to
> a
> > > > >>>> different
> > > > >>>>>      > > partition. I want to be able to grab the old state
> using
> > > the
> > > > >> old
> > > > >>>>> key
> > > > >>>>>      > before
> > > > >>>>>      > > creating the new state on this instance. Redis as a
> > state
> > > > >> store
> > > > >>>>> makes it
> > > > >>>>>      > > easy to implement this where I can simply do a lookup
> > > before
> > > > >>>>> creating the
> > > > >>>>>      > > state. I see an implementation here :
> > > > >>>>>      > >
> > > > >>>>>      >
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > > >>>>>      > >
> > > > >>>>>      > > Has anyone tried this ? Any caveats.
> > > > >>>>>      > >
> > > > >>>>>      > > Thanks
> > > > >>>>>      > > Mohan
> > > > >>>>>      > >
> > > > >>>>>      > >
> > > > >>>>>      >
> > > > >>>>>      > --
> > > > >>>>>      > -- Guozhang
> > > > >>>>>      >
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > >
> >
>


-- 
-- Guozhang

Reply via email to