This certainly does seem like a flaw in the Streams API, although of course Streams is just in general not designed for use with remote anything (API calls, stores, etc)
That said, I don't see any reason why we *couldn't* have better support for remote state stores. Note that there's currently no deleteAll method on the store interface, and not all databases necessarily support that. But we could add a default implementation which just calls delete(key) on all keys in the state store, and for the RocksDB-based state stores we still wipe out the state as usual (and recommend the same for any custom StateStore which is local rather than remote). Obviously falling back on individual delete(key) operations for all the data in the entire store will have worse performance, but that's better than silently breaking EOS when deleteAll is not available on a remote store. Would you be interested in doing a KIP for this? We should also file a JIRA with the above explanation, so that other users of remote storage are aware of this limitation. And definitely document this somewhere On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna <br...@confluent.io.invalid> wrote: > Hi Alex, > > I guess wiping out the state directory is easier code-wise, faster, > and/or at the time of development the developers did not design for > remote state stores. But I do actually not know the exact reason. > > Off the top of my head, I do not know how to solve this for remote state > stores. Using the uncaught exception handler is not good, because a > computing node could fail without giving the JVM the opportunity to > throw an exception. > > In your tests, try to increase the commit interval to a high value and > see if you get inconsistencies. You should get an inconsistency if the > state store maintains counts for keys and after the last commit before > the failure, the Streams app puts an event with a new key K with value 1 > into the state store. After failover, Streams would put the same event > with key K again into the state store. If the state store deleted all of > its data, Streams would put again value 1, but if the state store did > not delete all data, Streams would put value 2 which is wrong because it > would count the same event twice. > > Best, > Bruno > > > On 15.03.21 15:20, Alex Craig wrote: > > Bruno, > > Thanks for the info! that makes sense. Of course now I have more > > questions. :) Do you know why this is being done outside of the state > > store API? I assume there are reasons why a "deleteAll()" type of > function > > wouldn't work, thereby allowing a state store to purge itself? And maybe > > more importantly, is there a way to achieve a similar behavior with a 3rd > > party store? I'm not sure if hooking into the uncaught exception handler > > might be a good way to purge/drop a state store in the event of a fatal > > error? I did setup a MongoDB state store recently as part of a POC and > was > > testing it with EOS enabled. (forcing crashes to occur and checking that > > the result of my aggregation was still accurate) I was unable to cause > > inconsistent data in the mongo store (which is good!), though of course I > > may just have been getting lucky. Thanks again for your help, > > > > Alex > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <pdeole2...@gmail.com> > wrote: > > > >> Bruno, > >> > >> i tried to explain this in 'kafka user's language through above > mentioned > >> scenario, hope i put it properly -:) and correct me if i am wrong > >> > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <pdeole2...@gmail.com> > >> wrote: > >> > >>> This is what I understand could be the issue with external state store: > >>> > >>> kafka stream application consumes source topic, does processing, stores > >>> state to kafka state store (this is backed by topic) and before > producing > >>> event on destination topic, the application fails with some issue. In > >>> this case, the transaction has failed, so kafka guarantees either all > or > >>> none, means offset written to source topic, state written to state > store > >>> topic, output produced on destination topic... all of these happen or > >> none > >>> of these and in this failure scenario it is none of these. > >>> > >>> Assume you have redis state store, and you updated the state into redis > >>> and stream application failed. Now, you have source topic and > destination > >>> topic consistent i.e. offset is not committed to source topic and > output > >>> not produced on destination topic, but you redis state store is > >>> inconsistent with that since it is external state store and kafka can't > >>> guarantee rollback ot state written there > >>> > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <alexcrai...@gmail.com> > >> wrote: > >>> > >>>> " Another issue with 3rd party state stores could be violation of > >>>> exactly-once guarantee provided by kafka streams in the event of a > >> failure > >>>> of streams application instance" > >>>> > >>>> I've heard this before but would love to know more about how a custom > >>>> state > >>>> store would be at any greater risk than RocksDB as far as exactly-once > >>>> guarantees are concerned. They all implement the same interface, so > as > >>>> long as you're correctly implementing get(), put(), delete(), flush(), > >>>> etc, > >>>> you should be fine right? In other words, I don't think there is any > >>>> special "exactly once magic" that is baked into the RocksDB store > >> code. I > >>>> could be wrong though so I'd love to hear people's thoughts, thanks, > >>>> > >>>> Alex C > >>>> > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan < > mpart...@hpe.com> > >>>> wrote: > >>>> > >>>>> Thanks for the responses. In the worst case, I might have to keep > both > >>>>> rocksdb for local store and keep an external store like Redis. > >>>>> > >>>>> -mohan > >>>>> > >>>>> > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com> wrote: > >>>>> > >>>>> Another issue with 3rd party state stores could be violation of > >>>>> exactly-once guarantee provided by kafka streams in the event > of a > >>>>> failure > >>>>> of streams application instance. > >>>>> Kafka provides exactly once guarantee for consumer -> process -> > >>>>> produce > >>>>> through transactions and with the use of state store like redis, > >>>> this > >>>>> guarantee is weaker > >>>>> > >>>>> On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang < > wangg...@gmail.com > >>> > >>>>> wrote: > >>>>> > >>>>> > Hello Mohan, > >>>>> > > >>>>> > I think what you had in mind works with Redis, since it is a > >>>> remote > >>>>> state > >>>>> > store engine, it does not have the co-partitioning > requirements > >> as > >>>>> local > >>>>> > state stores. > >>>>> > > >>>>> > One thing you'd need to tune KS though is that with remote > >> stores, > >>>>> the > >>>>> > processing latency may be larger, and since Kafka Streams > >> process > >>>> all > >>>>> > records of a single partition in order, synchronously, you may > >>>> need > >>>>> to tune > >>>>> > the poll interval configs etc to make sure KS would stay in > the > >>>>> consumer > >>>>> > group and not trigger unnecessary rebalances. > >>>>> > > >>>>> > Guozhang > >>>>> > > >>>>> > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan < > >>>>> mpart...@hpe.com> > >>>>> > wrote: > >>>>> > > >>>>> > > Hi, > >>>>> > > > >>>>> > > I have a use case where messages come in with some key gets > >>>>> assigned some > >>>>> > > partition and the state gets created. Later, key changes > (but > >>>> still > >>>>> > > contains the old key in the message) and gets sent to a > >>>> different > >>>>> > > partition. I want to be able to grab the old state using the > >> old > >>>>> key > >>>>> > before > >>>>> > > creating the new state on this instance. Redis as a state > >> store > >>>>> makes it > >>>>> > > easy to implement this where I can simply do a lookup before > >>>>> creating the > >>>>> > > state. I see an implementation here : > >>>>> > > > >>>>> > > >>>>> > >>>> > >> > https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks > >>>>> > > > >>>>> > > Has anyone tried this ? Any caveats. > >>>>> > > > >>>>> > > Thanks > >>>>> > > Mohan > >>>>> > > > >>>>> > > > >>>>> > > >>>>> > -- > >>>>> > -- Guozhang > >>>>> > > >>>>> > >>>>> > >>>>> > >>>> > >>> > >> > > >