Sophie,

In continuation to the discussion above, now that we are using redis for
storing state in our application which breaks the EOS of kafka streams, I
do have one question and whether we can do some work around here: in the
scenario :

1. Event consumed from source
2. Event processed and state stored in redis
3. Before event is sent to sink topic, the node goes down
4. Partition get rebalanced and another node consumes/processes same event
again

Even though redis state store could be inconsistent with rest of streams
infrastructure like source/sink topics, can we still get exactly-once
guarantee across source/sink topic where the offset commit to source and
send to sink would all happen atomically or nothing would happen. If this
is still guaranteed by kafka streams then we can think of applying some
workaround while reading state from redis e.g. we could try and apply some
conditional logic when fetching the state from redis based on unique id of
event i.e. along with state, we will store that state belong with
processing of event based on unique, so when that same event is consumed
back from source, the processor will use state in redis as current state
and not as previous state since the event has been processed and state
stored in redis already

On Wed, Mar 24, 2021 at 4:15 AM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> >
> > it seems to me that local state stores might prove more performant in the
> > event of above
> > mentioned failures of streams application
>
> Note, the scenario I was describing with deleting all record from the
> remote store in event of
> failure is *not* what we do today -- I was describing a potential solution
> to the problem of EOS
> with remote storage. As of the current code, local storage is actually the
> one which would wipe
> out the state stores and need to restore from scratch. Granted, I'm
> actually not sure how this
> would be handled with EOS -- most likely we would still end up restoring
> from the changelog,
> but we just wouldn't wipe out the state stores.
>
> But the point here is that EOS with remote storage is broken -- if your use
> case requires
> exactly-once semantics, you *must* use local state stores. If you don't
> care about occasionally
> processing a record twice, or consider steady performance more important
> than this, then just
> don't use EOS in the first place.
>
> Note: Streams doesn't provide an RPC layer itself, it's up to you to
> implement, but yes Streams
> does support query state stores through RPC. But this feature (Interactive
> Queries/IQ) is for
> querying the state from outside the Kafka Streams application, it's not
> intended to be used
> from within a processor. In general remote calls inside a processor are
> somewhat discouraged,
> since it's difficult to handle failed calls. And remote calls will also
> always break EOS, if that
> matters to you.
>
> All that said, I guess you *could* use IQ to query the state stores on
> another instance from within
> a processor, but I highly doubt this would solve the problem you're facing:
> if the GlobalKTable
> isn't syncing fast enough for you then I wouldn't expect updates in another
> instance to be
> available when you need them. This depends on what you're trying to do and
> the flow of data
> on the input topics, but I get the sense there's probably a better way to
> do what you're trying to
> achieve. I'm not sure why you would want a GlobalKTable for example -- why
> not just have
> one application write the results to an output topic, and then let the
> downstream application
> read in that topic as a table? You should be able to have full control over
> things that way
>
> On Sun, Mar 21, 2021 at 5:58 AM Pushkar Deole <pdeole2...@gmail.com>
> wrote:
>
> > Thanks Sophie... i was just thinking what would be a good options for us,
> > whether local state stores or redis state store, and it seems to me that
> > local state stores might prove more performant in the event of above
> > mentioned failures of streams application.
> >
> > The main reason we are thinking of moving to redis state stores is
> because:
> > we want each application to have access to state saved by other stream
> > application instance. We tried to use a GlobalKTable backed by topic and
> > each instance would save to that topic which is then synchronized to
> > GlobalKTable in each application instance. However making GlobalKTable in
> > each application instance took around 100ms and before that time the next
> > event might need to get processed by application instance in which case
> it
> > did not have proper state.
> >
> > I was also looking at some options today available with local state store
> > and came across that kafka also provides an RPC layer on top of state
> store
> > which allows a steam application to query state stored in local state
> store
> > of another stream application instance. Is that correct? If so then we
> can
> > try that option instead of redis state store. Let me know what you think
> >
> > On Sun, Mar 21, 2021 at 6:38 AM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > Yes, if you have to restore from the changelog from scratch then this
> > will
> > > definitely impact
> > > the application's performance. This is the current state of things for
> > EOS
> > > applications that use
> > > some kind of local storage such as the in-memory or rocksdb state
> > > stores.The point of EOS is
> > > to be 100% correct, not to maximize performance -- it's a tradeoff and
> > you
> > > need to decide what
> > > characteristics are most important for the specific application and use
> > > case.
> > >
> > > That said, obviously better performance is always a good thing when
> it's
> > > possible to do without
> > > sacrificing processing semantics. That's why I proposed to buffer
> > updates;
> > > if we can avoid dirtying
> > > the store in the first place, then there's no need to wipe out all the
> > > state and rebuild from the changelog
> > > from scratch. So yes, this was intended as an alternative proposal
> which
> > > would improve the performance
> > > for any EOS application regardless of whether it uses local or remote
> > > storage.
> > >
> > > But as with all things, this has tradeoffs of its own: for one thing
> it's
> > > probably a significantly larger
> > > effort to implement, so if we want to correct the EOS + remote storage
> > > situation quickly then this
> > > approach would not be the best way to go. Also, buffering updates of
> > course
> > > requires additional
> > > resources (ie storage and/or memory), so some users may actually prefer
> > to
> > > take an occasional
> > > performance hit to keep their app lightweight.
> > >
> > > Anyways, these are just some thoughts on how to improve the current
> > > situation. Maybe there are
> > > even more options to address this problem which haven't been considered
> > > yet. Let us know if you
> > > have a better idea :)
> > >
> > > On Fri, Mar 19, 2021 at 11:50 PM Pushkar Deole <pdeole2...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Sophie... that answers my question, however still worried
> about
> > > some
> > > > other aspects:
> > > >
> > > > 1. If redis is to be restored from changelog topic: what would happen
> > if
> > > i
> > > > have 3 stream applications and 1 instance went down ... will other 2
> > > > instances halt until entire existing state from redis is wiped out
> and
> > > > entire state is restored back from changelog topic? If so then it
> would
> > > > have a significant performance hit especially when this happens
> during
> > > > heavy traffic hours
> > > >
> > > > 2. Will #1 be solved by the 2nd alternative that you mentioned in the
> > > > comment i.e 'An alternative is to just start buffering updates
> > in-memory
> > > > (or in rocksdb, this could be configurable) and then avoid dirtying
> the
> > > > remote storage in the first place as we would only flush the data out
> > to
> > > it
> > > > during a commit'  It looks to me that this won't need rebuilding
> entire
> > > > state store because changelog is disabled, and this alternative would
> > > avoid
> > > > making the state store inconsistent in first place, thus saving wipe
> > out
> > > > and rebuild ? If so then this also doesn't need to halt other stream
> > > > applications and would prove much more better approach from
> performance
> > > > point of view. Is that correct?
> > > >
> > > > On Sat, Mar 20, 2021 at 2:25 AM Sophie Blee-Goldman
> > > > <sop...@confluent.io.invalid> wrote:
> > > >
> > > > > Hey Pushkar, yes, the data will still be backed by a changelog
> topic
> > > > unless
> > > > > the
> > > > > user explicitly disables logging for that state store. The fault
> > > > tolerance
> > > > > mechanism
> > > > > of Kafka Streams is based on changelogging, therefore there are no
> > > > > correctness
> > > > > guarantees if you decide to disable it.
> > > > >
> > > > > That said, I'm guessing many users do in fact disable the changelog
> > > when
> > > > > plugging
> > > > > in a remote store with it's own fault tolerance guarantees -- is
> that
> > > > what
> > > > > you're getting
> > > > > at? We could definitely build in better support for that case, as
> > > either
> > > > an
> > > > > additional
> > > > > optimization on top of KAFKA-12475
> > > > > <https://issues.apache.org/jira/browse/KAFKA-12475> or as an
> > > alternative
> > > > > implementation to fix the
> > > > > underlying EOS problem. Check out my latest comment on the ticket
> > here
> > > > > <
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-12475?focusedCommentId=17305191&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305191
> > > > > >
> > > > >
> > > > > Does that address your question?
> > > > >
> > > > > On Fri, Mar 19, 2021 at 10:14 AM Pushkar Deole <
> pdeole2...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hello Sophie,
> > > > > >
> > > > > > may be i am missing something here, however can you let me know
> > how a
> > > > > redis
> > > > > > based state store will be wiped off the inconsistent state in
> case
> > > > stream
> > > > > > application dies in the middle of processing e.g. stream
> > application
> > > > > > consumed from source topic, processed source event and saved
> state
> > to
> > > > > redis
> > > > > > and before producing event on destination topic, the stream
> > > application
> > > > > had
> > > > > > error.
> > > > > > If this occurs with a rocksDB or in-memory state store, it will
> be
> > > > > rebuilt
> > > > > > from changelog topic, however for redis state store, how it will
> > > wiped
> > > > > off
> > > > > > the state ? are we saying here that the data stored in redis will
> > > still
> > > > > be
> > > > > > backed by changelog topic and redis will be restored from backed
> > > topic
> > > > in
> > > > > > case of stream application error?
> > > > > >
> > > > > > On Tue, Mar 16, 2021 at 12:18 AM Sophie Blee-Goldman
> > > > > > <sop...@confluent.io.invalid> wrote:
> > > > > >
> > > > > > > This certainly does seem like a flaw in the Streams API,
> although
> > > of
> > > > > > course
> > > > > > > Streams is just
> > > > > > > in general not designed for use with remote anything (API
> calls,
> > > > > stores,
> > > > > > > etc)
> > > > > > >
> > > > > > > That said, I don't see any reason why we *couldn't* have better
> > > > support
> > > > > > for
> > > > > > > remote state stores.
> > > > > > > Note that there's currently no deleteAll method on the store
> > > > interface,
> > > > > > and
> > > > > > > not all databases
> > > > > > > necessarily support that. But we could add a default
> > implementation
> > > > > which
> > > > > > > just calls delete(key)
> > > > > > > on all keys in the state store, and for the RocksDB-based state
> > > > stores
> > > > > we
> > > > > > > still wipe out the state
> > > > > > > as usual (and recommend the same for any custom StateStore
> which
> > is
> > > > > local
> > > > > > > rather than remote).
> > > > > > > Obviously falling back on individual delete(key) operations for
> > all
> > > > the
> > > > > > > data in the entire store will
> > > > > > > have worse performance, but that's better than silently
> breaking
> > > EOS
> > > > > when
> > > > > > > deleteAll is not available
> > > > > > > on a remote store.
> > > > > > >
> > > > > > > Would you be interested in doing a KIP for this? We should also
> > > file
> > > > a
> > > > > > JIRA
> > > > > > > with the above explanation,
> > > > > > > so that other users of remote storage are aware of this
> > limitation.
> > > > And
> > > > > > > definitely document this somewhere
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> > > > > <br...@confluent.io.invalid
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Alex,
> > > > > > > >
> > > > > > > > I guess wiping out the state directory is easier code-wise,
> > > faster,
> > > > > > > > and/or at the time of development the developers did not
> design
> > > for
> > > > > > > > remote state stores. But I do actually not know the exact
> > reason.
> > > > > > > >
> > > > > > > > Off the top of my head, I do not know how to solve this for
> > > remote
> > > > > > state
> > > > > > > > stores. Using the uncaught exception handler is not good,
> > > because a
> > > > > > > > computing node could fail without giving the JVM the
> > opportunity
> > > to
> > > > > > > > throw an exception.
> > > > > > > >
> > > > > > > > In your tests, try to increase the commit interval to a high
> > > value
> > > > > and
> > > > > > > > see if you get inconsistencies. You should get an
> inconsistency
> > > if
> > > > > the
> > > > > > > > state store maintains counts for keys and after the last
> commit
> > > > > before
> > > > > > > > the failure, the Streams app puts an event with a new key K
> > with
> > > > > value
> > > > > > 1
> > > > > > > > into the state store. After failover, Streams would put the
> > same
> > > > > event
> > > > > > > > with key K again into the state store. If the state store
> > deleted
> > > > all
> > > > > > of
> > > > > > > > its data, Streams would put again value 1, but if the state
> > store
> > > > did
> > > > > > > > not delete all data, Streams would put value 2 which is wrong
> > > > because
> > > > > > it
> > > > > > > > would count the same event twice.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Bruno
> > > > > > > >
> > > > > > > >
> > > > > > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > > > > > Bruno,
> > > > > > > > > Thanks for the info!  that makes sense.  Of course now I
> have
> > > > more
> > > > > > > > > questions.  :)  Do you know why this is being done outside
> of
> > > the
> > > > > > state
> > > > > > > > > store API?  I assume there are reasons why a "deleteAll()"
> > type
> > > > of
> > > > > > > > function
> > > > > > > > > wouldn't work, thereby allowing a state store to purge
> > itself?
> > > > And
> > > > > > > maybe
> > > > > > > > > more importantly, is there a way to achieve a similar
> > behavior
> > > > > with a
> > > > > > > 3rd
> > > > > > > > > party store?  I'm not sure if hooking into the uncaught
> > > exception
> > > > > > > handler
> > > > > > > > > might be a good way to purge/drop a state store in the
> event
> > > of a
> > > > > > fatal
> > > > > > > > > error?  I did setup a MongoDB state store recently as part
> > of a
> > > > POC
> > > > > > and
> > > > > > > > was
> > > > > > > > > testing it with EOS enabled.  (forcing crashes to occur and
> > > > > checking
> > > > > > > that
> > > > > > > > > the result of my aggregation was still accurate)  I was
> > unable
> > > to
> > > > > > cause
> > > > > > > > > inconsistent data in the mongo store (which is good!),
> though
> > > of
> > > > > > > course I
> > > > > > > > > may just have been getting lucky.  Thanks again for your
> > help,
> > > > > > > > >
> > > > > > > > > Alex
> > > > > > > > >
> > > > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> > > > > pdeole2...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Bruno,
> > > > > > > > >>
> > > > > > > > >> i tried to explain this in 'kafka user's language through
> > > above
> > > > > > > > mentioned
> > > > > > > > >> scenario, hope i put it properly -:) and correct me if i
> am
> > > > wrong
> > > > > > > > >>
> > > > > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> > > > > pdeole2...@gmail.com
> > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >>> This is what I understand could be the issue with
> external
> > > > state
> > > > > > > store:
> > > > > > > > >>>
> > > > > > > > >>> kafka stream application consumes source topic, does
> > > > processing,
> > > > > > > stores
> > > > > > > > >>> state to kafka state store (this is backed by topic) and
> > > before
> > > > > > > > producing
> > > > > > > > >>> event on destination topic, the application fails with
> some
> > > > > issue.
> > > > > > In
> > > > > > > > >>> this case, the transaction has failed, so kafka
> guarantees
> > > > either
> > > > > > all
> > > > > > > > or
> > > > > > > > >>> none, means offset written to source topic, state written
> > to
> > > > > state
> > > > > > > > store
> > > > > > > > >>> topic, output produced on destination topic... all of
> these
> > > > > happen
> > > > > > or
> > > > > > > > >> none
> > > > > > > > >>> of these and in this failure scenario it is none of
> these.
> > > > > > > > >>>
> > > > > > > > >>> Assume you have redis state store, and you updated the
> > state
> > > > into
> > > > > > > redis
> > > > > > > > >>> and stream application failed. Now, you have source topic
> > and
> > > > > > > > destination
> > > > > > > > >>> topic consistent i.e. offset is not committed to source
> > topic
> > > > and
> > > > > > > > output
> > > > > > > > >>> not produced on destination topic, but you redis state
> > store
> > > is
> > > > > > > > >>> inconsistent with that since it is external state store
> and
> > > > kafka
> > > > > > > can't
> > > > > > > > >>> guarantee rollback ot state written there
> > > > > > > > >>>
> > > > > > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <
> > > > > alexcrai...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >>>
> > > > > > > > >>>> " Another issue with 3rd party state stores could be
> > > violation
> > > > > of
> > > > > > > > >>>> exactly-once guarantee provided by kafka streams in the
> > > event
> > > > > of a
> > > > > > > > >> failure
> > > > > > > > >>>> of streams application instance"
> > > > > > > > >>>>
> > > > > > > > >>>> I've heard this before but would love to know more about
> > > how a
> > > > > > > custom
> > > > > > > > >>>> state
> > > > > > > > >>>> store would be at any greater risk than RocksDB as far
> as
> > > > > > > exactly-once
> > > > > > > > >>>> guarantees are concerned.  They all implement the same
> > > > > interface,
> > > > > > so
> > > > > > > > as
> > > > > > > > >>>> long as you're correctly implementing get(), put(),
> > > delete(),
> > > > > > > flush(),
> > > > > > > > >>>> etc,
> > > > > > > > >>>> you should be fine right?  In other words, I don't think
> > > there
> > > > > is
> > > > > > > any
> > > > > > > > >>>> special "exactly once magic" that is baked into the
> > RocksDB
> > > > > store
> > > > > > > > >> code.  I
> > > > > > > > >>>> could be wrong though so I'd love to hear people's
> > thoughts,
> > > > > > thanks,
> > > > > > > > >>>>
> > > > > > > > >>>> Alex C
> > > > > > > > >>>>
> > > > > > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > > > > > > mpart...@hpe.com>
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>
> > > > > > > > >>>>> Thanks for the responses. In the worst case, I might
> have
> > > to
> > > > > keep
> > > > > > > > both
> > > > > > > > >>>>> rocksdb for local store and keep an external store like
> > > > Redis.
> > > > > > > > >>>>>
> > > > > > > > >>>>> -mohan
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <
> > > pdeole2...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>      Another issue with 3rd party state stores could be
> > > > > violation
> > > > > > > of
> > > > > > > > >>>>>      exactly-once guarantee provided by kafka streams
> in
> > > the
> > > > > > event
> > > > > > > > of a
> > > > > > > > >>>>> failure
> > > > > > > > >>>>>      of streams application instance.
> > > > > > > > >>>>>      Kafka provides exactly once guarantee for consumer
> > ->
> > > > > > process
> > > > > > > ->
> > > > > > > > >>>>> produce
> > > > > > > > >>>>>      through transactions and with the use of state
> store
> > > > like
> > > > > > > redis,
> > > > > > > > >>>> this
> > > > > > > > >>>>>      guarantee is weaker
> > > > > > > > >>>>>
> > > > > > > > >>>>>      On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <
> > > > > > > > wangg...@gmail.com
> > > > > > > > >>>
> > > > > > > > >>>>> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>      > Hello Mohan,
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>      > I think what you had in mind works with Redis,
> > since
> > > > it
> > > > > > is a
> > > > > > > > >>>> remote
> > > > > > > > >>>>> state
> > > > > > > > >>>>>      > store engine, it does not have the
> co-partitioning
> > > > > > > > requirements
> > > > > > > > >> as
> > > > > > > > >>>>> local
> > > > > > > > >>>>>      > state stores.
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>      > One thing you'd need to tune KS though is that
> > with
> > > > > remote
> > > > > > > > >> stores,
> > > > > > > > >>>>> the
> > > > > > > > >>>>>      > processing latency may be larger, and since
> Kafka
> > > > > Streams
> > > > > > > > >> process
> > > > > > > > >>>> all
> > > > > > > > >>>>>      > records of a single partition in order,
> > > synchronously,
> > > > > you
> > > > > > > may
> > > > > > > > >>>> need
> > > > > > > > >>>>> to tune
> > > > > > > > >>>>>      > the poll interval configs etc to make sure KS
> > would
> > > > stay
> > > > > > in
> > > > > > > > the
> > > > > > > > >>>>> consumer
> > > > > > > > >>>>>      > group and not trigger unnecessary rebalances.
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>      > Guozhang
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>      > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy,
> > > Mohan <
> > > > > > > > >>>>> mpart...@hpe.com>
> > > > > > > > >>>>>      > wrote:
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>      > > Hi,
> > > > > > > > >>>>>      > >
> > > > > > > > >>>>>      > > I have a use case where messages come in with
> > some
> > > > key
> > > > > > > gets
> > > > > > > > >>>>> assigned some
> > > > > > > > >>>>>      > > partition and the state gets created. Later,
> key
> > > > > changes
> > > > > > > > (but
> > > > > > > > >>>> still
> > > > > > > > >>>>>      > > contains the old key in the message) and gets
> > sent
> > > > to
> > > > > a
> > > > > > > > >>>> different
> > > > > > > > >>>>>      > > partition. I want to be able to grab the old
> > state
> > > > > using
> > > > > > > the
> > > > > > > > >> old
> > > > > > > > >>>>> key
> > > > > > > > >>>>>      > before
> > > > > > > > >>>>>      > > creating the new state on this instance. Redis
> > as
> > > a
> > > > > > state
> > > > > > > > >> store
> > > > > > > > >>>>> makes it
> > > > > > > > >>>>>      > > easy to implement this where I can simply do a
> > > > lookup
> > > > > > > before
> > > > > > > > >>>>> creating the
> > > > > > > > >>>>>      > > state. I see an implementation here :
> > > > > > > > >>>>>      > >
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>
> > > > > > > > >>>>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > > > > > > >>>>>      > >
> > > > > > > > >>>>>      > > Has anyone tried this ? Any caveats.
> > > > > > > > >>>>>      > >
> > > > > > > > >>>>>      > > Thanks
> > > > > > > > >>>>>      > > Mohan
> > > > > > > > >>>>>      > >
> > > > > > > > >>>>>      > >
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>      > --
> > > > > > > > >>>>>      > -- Guozhang
> > > > > > > > >>>>>      >
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to