Thanks Sophie... i was just thinking what would be a good options for us, whether local state stores or redis state store, and it seems to me that local state stores might prove more performant in the event of above mentioned failures of streams application.
The main reason we are thinking of moving to redis state stores is because: we want each application to have access to state saved by other stream application instance. We tried to use a GlobalKTable backed by topic and each instance would save to that topic which is then synchronized to GlobalKTable in each application instance. However making GlobalKTable in each application instance took around 100ms and before that time the next event might need to get processed by application instance in which case it did not have proper state. I was also looking at some options today available with local state store and came across that kafka also provides an RPC layer on top of state store which allows a steam application to query state stored in local state store of another stream application instance. Is that correct? If so then we can try that option instead of redis state store. Let me know what you think On Sun, Mar 21, 2021 at 6:38 AM Sophie Blee-Goldman <sop...@confluent.io.invalid> wrote: > Yes, if you have to restore from the changelog from scratch then this will > definitely impact > the application's performance. This is the current state of things for EOS > applications that use > some kind of local storage such as the in-memory or rocksdb state > stores.The point of EOS is > to be 100% correct, not to maximize performance -- it's a tradeoff and you > need to decide what > characteristics are most important for the specific application and use > case. > > That said, obviously better performance is always a good thing when it's > possible to do without > sacrificing processing semantics. That's why I proposed to buffer updates; > if we can avoid dirtying > the store in the first place, then there's no need to wipe out all the > state and rebuild from the changelog > from scratch. So yes, this was intended as an alternative proposal which > would improve the performance > for any EOS application regardless of whether it uses local or remote > storage. > > But as with all things, this has tradeoffs of its own: for one thing it's > probably a significantly larger > effort to implement, so if we want to correct the EOS + remote storage > situation quickly then this > approach would not be the best way to go. Also, buffering updates of course > requires additional > resources (ie storage and/or memory), so some users may actually prefer to > take an occasional > performance hit to keep their app lightweight. > > Anyways, these are just some thoughts on how to improve the current > situation. Maybe there are > even more options to address this problem which haven't been considered > yet. Let us know if you > have a better idea :) > > On Fri, Mar 19, 2021 at 11:50 PM Pushkar Deole <pdeole2...@gmail.com> > wrote: > > > Thanks Sophie... that answers my question, however still worried about > some > > other aspects: > > > > 1. If redis is to be restored from changelog topic: what would happen if > i > > have 3 stream applications and 1 instance went down ... will other 2 > > instances halt until entire existing state from redis is wiped out and > > entire state is restored back from changelog topic? If so then it would > > have a significant performance hit especially when this happens during > > heavy traffic hours > > > > 2. Will #1 be solved by the 2nd alternative that you mentioned in the > > comment i.e 'An alternative is to just start buffering updates in-memory > > (or in rocksdb, this could be configurable) and then avoid dirtying the > > remote storage in the first place as we would only flush the data out to > it > > during a commit' It looks to me that this won't need rebuilding entire > > state store because changelog is disabled, and this alternative would > avoid > > making the state store inconsistent in first place, thus saving wipe out > > and rebuild ? If so then this also doesn't need to halt other stream > > applications and would prove much more better approach from performance > > point of view. Is that correct? > > > > On Sat, Mar 20, 2021 at 2:25 AM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > Hey Pushkar, yes, the data will still be backed by a changelog topic > > unless > > > the > > > user explicitly disables logging for that state store. The fault > > tolerance > > > mechanism > > > of Kafka Streams is based on changelogging, therefore there are no > > > correctness > > > guarantees if you decide to disable it. > > > > > > That said, I'm guessing many users do in fact disable the changelog > when > > > plugging > > > in a remote store with it's own fault tolerance guarantees -- is that > > what > > > you're getting > > > at? We could definitely build in better support for that case, as > either > > an > > > additional > > > optimization on top of KAFKA-12475 > > > <https://issues.apache.org/jira/browse/KAFKA-12475> or as an > alternative > > > implementation to fix the > > > underlying EOS problem. Check out my latest comment on the ticket here > > > < > > > > > > https://issues.apache.org/jira/browse/KAFKA-12475?focusedCommentId=17305191&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305191 > > > > > > > > > > Does that address your question? > > > > > > On Fri, Mar 19, 2021 at 10:14 AM Pushkar Deole <pdeole2...@gmail.com> > > > wrote: > > > > > > > Hello Sophie, > > > > > > > > may be i am missing something here, however can you let me know how a > > > redis > > > > based state store will be wiped off the inconsistent state in case > > stream > > > > application dies in the middle of processing e.g. stream application > > > > consumed from source topic, processed source event and saved state to > > > redis > > > > and before producing event on destination topic, the stream > application > > > had > > > > error. > > > > If this occurs with a rocksDB or in-memory state store, it will be > > > rebuilt > > > > from changelog topic, however for redis state store, how it will > wiped > > > off > > > > the state ? are we saying here that the data stored in redis will > still > > > be > > > > backed by changelog topic and redis will be restored from backed > topic > > in > > > > case of stream application error? > > > > > > > > On Tue, Mar 16, 2021 at 12:18 AM Sophie Blee-Goldman > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > This certainly does seem like a flaw in the Streams API, although > of > > > > course > > > > > Streams is just > > > > > in general not designed for use with remote anything (API calls, > > > stores, > > > > > etc) > > > > > > > > > > That said, I don't see any reason why we *couldn't* have better > > support > > > > for > > > > > remote state stores. > > > > > Note that there's currently no deleteAll method on the store > > interface, > > > > and > > > > > not all databases > > > > > necessarily support that. But we could add a default implementation > > > which > > > > > just calls delete(key) > > > > > on all keys in the state store, and for the RocksDB-based state > > stores > > > we > > > > > still wipe out the state > > > > > as usual (and recommend the same for any custom StateStore which is > > > local > > > > > rather than remote). > > > > > Obviously falling back on individual delete(key) operations for all > > the > > > > > data in the entire store will > > > > > have worse performance, but that's better than silently breaking > EOS > > > when > > > > > deleteAll is not available > > > > > on a remote store. > > > > > > > > > > Would you be interested in doing a KIP for this? We should also > file > > a > > > > JIRA > > > > > with the above explanation, > > > > > so that other users of remote storage are aware of this limitation. > > And > > > > > definitely document this somewhere > > > > > > > > > > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > > > <br...@confluent.io.invalid > > > > > > > > > > wrote: > > > > > > > > > > > Hi Alex, > > > > > > > > > > > > I guess wiping out the state directory is easier code-wise, > faster, > > > > > > and/or at the time of development the developers did not design > for > > > > > > remote state stores. But I do actually not know the exact reason. > > > > > > > > > > > > Off the top of my head, I do not know how to solve this for > remote > > > > state > > > > > > stores. Using the uncaught exception handler is not good, > because a > > > > > > computing node could fail without giving the JVM the opportunity > to > > > > > > throw an exception. > > > > > > > > > > > > In your tests, try to increase the commit interval to a high > value > > > and > > > > > > see if you get inconsistencies. You should get an inconsistency > if > > > the > > > > > > state store maintains counts for keys and after the last commit > > > before > > > > > > the failure, the Streams app puts an event with a new key K with > > > value > > > > 1 > > > > > > into the state store. After failover, Streams would put the same > > > event > > > > > > with key K again into the state store. If the state store deleted > > all > > > > of > > > > > > its data, Streams would put again value 1, but if the state store > > did > > > > > > not delete all data, Streams would put value 2 which is wrong > > because > > > > it > > > > > > would count the same event twice. > > > > > > > > > > > > Best, > > > > > > Bruno > > > > > > > > > > > > > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > > > > > Bruno, > > > > > > > Thanks for the info! that makes sense. Of course now I have > > more > > > > > > > questions. :) Do you know why this is being done outside of > the > > > > state > > > > > > > store API? I assume there are reasons why a "deleteAll()" type > > of > > > > > > function > > > > > > > wouldn't work, thereby allowing a state store to purge itself? > > And > > > > > maybe > > > > > > > more importantly, is there a way to achieve a similar behavior > > > with a > > > > > 3rd > > > > > > > party store? I'm not sure if hooking into the uncaught > exception > > > > > handler > > > > > > > might be a good way to purge/drop a state store in the event > of a > > > > fatal > > > > > > > error? I did setup a MongoDB state store recently as part of a > > POC > > > > and > > > > > > was > > > > > > > testing it with EOS enabled. (forcing crashes to occur and > > > checking > > > > > that > > > > > > > the result of my aggregation was still accurate) I was unable > to > > > > cause > > > > > > > inconsistent data in the mongo store (which is good!), though > of > > > > > course I > > > > > > > may just have been getting lucky. Thanks again for your help, > > > > > > > > > > > > > > Alex > > > > > > > > > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole < > > > pdeole2...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > >> Bruno, > > > > > > >> > > > > > > >> i tried to explain this in 'kafka user's language through > above > > > > > > mentioned > > > > > > >> scenario, hope i put it properly -:) and correct me if i am > > wrong > > > > > > >> > > > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole < > > > pdeole2...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > >>> This is what I understand could be the issue with external > > state > > > > > store: > > > > > > >>> > > > > > > >>> kafka stream application consumes source topic, does > > processing, > > > > > stores > > > > > > >>> state to kafka state store (this is backed by topic) and > before > > > > > > producing > > > > > > >>> event on destination topic, the application fails with some > > > issue. > > > > In > > > > > > >>> this case, the transaction has failed, so kafka guarantees > > either > > > > all > > > > > > or > > > > > > >>> none, means offset written to source topic, state written to > > > state > > > > > > store > > > > > > >>> topic, output produced on destination topic... all of these > > > happen > > > > or > > > > > > >> none > > > > > > >>> of these and in this failure scenario it is none of these. > > > > > > >>> > > > > > > >>> Assume you have redis state store, and you updated the state > > into > > > > > redis > > > > > > >>> and stream application failed. Now, you have source topic and > > > > > > destination > > > > > > >>> topic consistent i.e. offset is not committed to source topic > > and > > > > > > output > > > > > > >>> not produced on destination topic, but you redis state store > is > > > > > > >>> inconsistent with that since it is external state store and > > kafka > > > > > can't > > > > > > >>> guarantee rollback ot state written there > > > > > > >>> > > > > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig < > > > alexcrai...@gmail.com> > > > > > > >> wrote: > > > > > > >>> > > > > > > >>>> " Another issue with 3rd party state stores could be > violation > > > of > > > > > > >>>> exactly-once guarantee provided by kafka streams in the > event > > > of a > > > > > > >> failure > > > > > > >>>> of streams application instance" > > > > > > >>>> > > > > > > >>>> I've heard this before but would love to know more about > how a > > > > > custom > > > > > > >>>> state > > > > > > >>>> store would be at any greater risk than RocksDB as far as > > > > > exactly-once > > > > > > >>>> guarantees are concerned. They all implement the same > > > interface, > > > > so > > > > > > as > > > > > > >>>> long as you're correctly implementing get(), put(), > delete(), > > > > > flush(), > > > > > > >>>> etc, > > > > > > >>>> you should be fine right? In other words, I don't think > there > > > is > > > > > any > > > > > > >>>> special "exactly once magic" that is baked into the RocksDB > > > store > > > > > > >> code. I > > > > > > >>>> could be wrong though so I'd love to hear people's thoughts, > > > > thanks, > > > > > > >>>> > > > > > > >>>> Alex C > > > > > > >>>> > > > > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan < > > > > > > mpart...@hpe.com> > > > > > > >>>> wrote: > > > > > > >>>> > > > > > > >>>>> Thanks for the responses. In the worst case, I might have > to > > > keep > > > > > > both > > > > > > >>>>> rocksdb for local store and keep an external store like > > Redis. > > > > > > >>>>> > > > > > > >>>>> -mohan > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" < > pdeole2...@gmail.com> > > > > > wrote: > > > > > > >>>>> > > > > > > >>>>> Another issue with 3rd party state stores could be > > > violation > > > > > of > > > > > > >>>>> exactly-once guarantee provided by kafka streams in > the > > > > event > > > > > > of a > > > > > > >>>>> failure > > > > > > >>>>> of streams application instance. > > > > > > >>>>> Kafka provides exactly once guarantee for consumer -> > > > > process > > > > > -> > > > > > > >>>>> produce > > > > > > >>>>> through transactions and with the use of state store > > like > > > > > redis, > > > > > > >>>> this > > > > > > >>>>> guarantee is weaker > > > > > > >>>>> > > > > > > >>>>> On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang < > > > > > > wangg...@gmail.com > > > > > > >>> > > > > > > >>>>> wrote: > > > > > > >>>>> > > > > > > >>>>> > Hello Mohan, > > > > > > >>>>> > > > > > > > >>>>> > I think what you had in mind works with Redis, since > > it > > > > is a > > > > > > >>>> remote > > > > > > >>>>> state > > > > > > >>>>> > store engine, it does not have the co-partitioning > > > > > > requirements > > > > > > >> as > > > > > > >>>>> local > > > > > > >>>>> > state stores. > > > > > > >>>>> > > > > > > > >>>>> > One thing you'd need to tune KS though is that with > > > remote > > > > > > >> stores, > > > > > > >>>>> the > > > > > > >>>>> > processing latency may be larger, and since Kafka > > > Streams > > > > > > >> process > > > > > > >>>> all > > > > > > >>>>> > records of a single partition in order, > synchronously, > > > you > > > > > may > > > > > > >>>> need > > > > > > >>>>> to tune > > > > > > >>>>> > the poll interval configs etc to make sure KS would > > stay > > > > in > > > > > > the > > > > > > >>>>> consumer > > > > > > >>>>> > group and not trigger unnecessary rebalances. > > > > > > >>>>> > > > > > > > >>>>> > Guozhang > > > > > > >>>>> > > > > > > > >>>>> > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, > Mohan < > > > > > > >>>>> mpart...@hpe.com> > > > > > > >>>>> > wrote: > > > > > > >>>>> > > > > > > > >>>>> > > Hi, > > > > > > >>>>> > > > > > > > > >>>>> > > I have a use case where messages come in with some > > key > > > > > gets > > > > > > >>>>> assigned some > > > > > > >>>>> > > partition and the state gets created. Later, key > > > changes > > > > > > (but > > > > > > >>>> still > > > > > > >>>>> > > contains the old key in the message) and gets sent > > to > > > a > > > > > > >>>> different > > > > > > >>>>> > > partition. I want to be able to grab the old state > > > using > > > > > the > > > > > > >> old > > > > > > >>>>> key > > > > > > >>>>> > before > > > > > > >>>>> > > creating the new state on this instance. Redis as > a > > > > state > > > > > > >> store > > > > > > >>>>> makes it > > > > > > >>>>> > > easy to implement this where I can simply do a > > lookup > > > > > before > > > > > > >>>>> creating the > > > > > > >>>>> > > state. I see an implementation here : > > > > > > >>>>> > > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > >>>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks > > > > > > >>>>> > > > > > > > > >>>>> > > Has anyone tried this ? Any caveats. > > > > > > >>>>> > > > > > > > > >>>>> > > Thanks > > > > > > >>>>> > > Mohan > > > > > > >>>>> > > > > > > > > >>>>> > > > > > > > > >>>>> > > > > > > > >>>>> > -- > > > > > > >>>>> > -- Guozhang > > > > > > >>>>> > > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >