Hey Sophie, Maybe we can first create a JIRA ticket for this?
On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman <sop...@confluent.io.invalid> wrote: > Sounds good! I meant anyone who is interested :) > > Let me know if you have any questions after digging in to this > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig <alexcrai...@gmail.com> wrote: > > > Hey Sophie, not sure if you meant me or not but I'd be happy to take a > > stab at creating a KIP for this. I want to spend some time digging into > > more of how this works first, but will then try to gather my thoughts and > > get something created. > > > > Alex > > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > This certainly does seem like a flaw in the Streams API, although of > > course > > > Streams is just > > > in general not designed for use with remote anything (API calls, > stores, > > > etc) > > > > > > That said, I don't see any reason why we *couldn't* have better support > > for > > > remote state stores. > > > Note that there's currently no deleteAll method on the store interface, > > and > > > not all databases > > > necessarily support that. But we could add a default implementation > which > > > just calls delete(key) > > > on all keys in the state store, and for the RocksDB-based state stores > we > > > still wipe out the state > > > as usual (and recommend the same for any custom StateStore which is > local > > > rather than remote). > > > Obviously falling back on individual delete(key) operations for all the > > > data in the entire store will > > > have worse performance, but that's better than silently breaking EOS > when > > > deleteAll is not available > > > on a remote store. > > > > > > Would you be interested in doing a KIP for this? We should also file a > > JIRA > > > with the above explanation, > > > so that other users of remote storage are aware of this limitation. And > > > definitely document this somewhere > > > > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > <br...@confluent.io.invalid > > > > > > wrote: > > > > > > > Hi Alex, > > > > > > > > I guess wiping out the state directory is easier code-wise, faster, > > > > and/or at the time of development the developers did not design for > > > > remote state stores. But I do actually not know the exact reason. > > > > > > > > Off the top of my head, I do not know how to solve this for remote > > state > > > > stores. Using the uncaught exception handler is not good, because a > > > > computing node could fail without giving the JVM the opportunity to > > > > throw an exception. > > > > > > > > In your tests, try to increase the commit interval to a high value > and > > > > see if you get inconsistencies. You should get an inconsistency if > the > > > > state store maintains counts for keys and after the last commit > before > > > > the failure, the Streams app puts an event with a new key K with > value > > 1 > > > > into the state store. After failover, Streams would put the same > event > > > > with key K again into the state store. If the state store deleted all > > of > > > > its data, Streams would put again value 1, but if the state store did > > > > not delete all data, Streams would put value 2 which is wrong because > > it > > > > would count the same event twice. > > > > > > > > Best, > > > > Bruno > > > > > > > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > > > Bruno, > > > > > Thanks for the info! that makes sense. Of course now I have more > > > > > questions. :) Do you know why this is being done outside of the > > state > > > > > store API? I assume there are reasons why a "deleteAll()" type of > > > > function > > > > > wouldn't work, thereby allowing a state store to purge itself? And > > > maybe > > > > > more importantly, is there a way to achieve a similar behavior > with a > > > 3rd > > > > > party store? I'm not sure if hooking into the uncaught exception > > > handler > > > > > might be a good way to purge/drop a state store in the event of a > > fatal > > > > > error? I did setup a MongoDB state store recently as part of a POC > > and > > > > was > > > > > testing it with EOS enabled. (forcing crashes to occur and > checking > > > that > > > > > the result of my aggregation was still accurate) I was unable to > > cause > > > > > inconsistent data in the mongo store (which is good!), though of > > > course I > > > > > may just have been getting lucky. Thanks again for your help, > > > > > > > > > > Alex > > > > > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole < > pdeole2...@gmail.com> > > > > wrote: > > > > > > > > > >> Bruno, > > > > >> > > > > >> i tried to explain this in 'kafka user's language through above > > > > mentioned > > > > >> scenario, hope i put it properly -:) and correct me if i am wrong > > > > >> > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole < > pdeole2...@gmail.com > > > > > > > >> wrote: > > > > >> > > > > >>> This is what I understand could be the issue with external state > > > store: > > > > >>> > > > > >>> kafka stream application consumes source topic, does processing, > > > stores > > > > >>> state to kafka state store (this is backed by topic) and before > > > > producing > > > > >>> event on destination topic, the application fails with some > issue. > > In > > > > >>> this case, the transaction has failed, so kafka guarantees either > > all > > > > or > > > > >>> none, means offset written to source topic, state written to > state > > > > store > > > > >>> topic, output produced on destination topic... all of these > happen > > or > > > > >> none > > > > >>> of these and in this failure scenario it is none of these. > > > > >>> > > > > >>> Assume you have redis state store, and you updated the state into > > > redis > > > > >>> and stream application failed. Now, you have source topic and > > > > destination > > > > >>> topic consistent i.e. offset is not committed to source topic and > > > > output > > > > >>> not produced on destination topic, but you redis state store is > > > > >>> inconsistent with that since it is external state store and kafka > > > can't > > > > >>> guarantee rollback ot state written there > > > > >>> > > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig < > alexcrai...@gmail.com> > > > > >> wrote: > > > > >>> > > > > >>>> " Another issue with 3rd party state stores could be violation > of > > > > >>>> exactly-once guarantee provided by kafka streams in the event > of a > > > > >> failure > > > > >>>> of streams application instance" > > > > >>>> > > > > >>>> I've heard this before but would love to know more about how a > > > custom > > > > >>>> state > > > > >>>> store would be at any greater risk than RocksDB as far as > > > exactly-once > > > > >>>> guarantees are concerned. They all implement the same > interface, > > so > > > > as > > > > >>>> long as you're correctly implementing get(), put(), delete(), > > > flush(), > > > > >>>> etc, > > > > >>>> you should be fine right? In other words, I don't think there > is > > > any > > > > >>>> special "exactly once magic" that is baked into the RocksDB > store > > > > >> code. I > > > > >>>> could be wrong though so I'd love to hear people's thoughts, > > thanks, > > > > >>>> > > > > >>>> Alex C > > > > >>>> > > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan < > > > > mpart...@hpe.com> > > > > >>>> wrote: > > > > >>>> > > > > >>>>> Thanks for the responses. In the worst case, I might have to > keep > > > > both > > > > >>>>> rocksdb for local store and keep an external store like Redis. > > > > >>>>> > > > > >>>>> -mohan > > > > >>>>> > > > > >>>>> > > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com> > > > wrote: > > > > >>>>> > > > > >>>>> Another issue with 3rd party state stores could be > violation > > > of > > > > >>>>> exactly-once guarantee provided by kafka streams in the > > event > > > > of a > > > > >>>>> failure > > > > >>>>> of streams application instance. > > > > >>>>> Kafka provides exactly once guarantee for consumer -> > > process > > > -> > > > > >>>>> produce > > > > >>>>> through transactions and with the use of state store like > > > redis, > > > > >>>> this > > > > >>>>> guarantee is weaker > > > > >>>>> > > > > >>>>> On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang < > > > > wangg...@gmail.com > > > > >>> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>> > Hello Mohan, > > > > >>>>> > > > > > >>>>> > I think what you had in mind works with Redis, since it > > is a > > > > >>>> remote > > > > >>>>> state > > > > >>>>> > store engine, it does not have the co-partitioning > > > > requirements > > > > >> as > > > > >>>>> local > > > > >>>>> > state stores. > > > > >>>>> > > > > > >>>>> > One thing you'd need to tune KS though is that with > remote > > > > >> stores, > > > > >>>>> the > > > > >>>>> > processing latency may be larger, and since Kafka > Streams > > > > >> process > > > > >>>> all > > > > >>>>> > records of a single partition in order, synchronously, > you > > > may > > > > >>>> need > > > > >>>>> to tune > > > > >>>>> > the poll interval configs etc to make sure KS would stay > > in > > > > the > > > > >>>>> consumer > > > > >>>>> > group and not trigger unnecessary rebalances. > > > > >>>>> > > > > > >>>>> > Guozhang > > > > >>>>> > > > > > >>>>> > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan < > > > > >>>>> mpart...@hpe.com> > > > > >>>>> > wrote: > > > > >>>>> > > > > > >>>>> > > Hi, > > > > >>>>> > > > > > > >>>>> > > I have a use case where messages come in with some key > > > gets > > > > >>>>> assigned some > > > > >>>>> > > partition and the state gets created. Later, key > changes > > > > (but > > > > >>>> still > > > > >>>>> > > contains the old key in the message) and gets sent to > a > > > > >>>> different > > > > >>>>> > > partition. I want to be able to grab the old state > using > > > the > > > > >> old > > > > >>>>> key > > > > >>>>> > before > > > > >>>>> > > creating the new state on this instance. Redis as a > > state > > > > >> store > > > > >>>>> makes it > > > > >>>>> > > easy to implement this where I can simply do a lookup > > > before > > > > >>>>> creating the > > > > >>>>> > > state. I see an implementation here : > > > > >>>>> > > > > > > >>>>> > > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks > > > > >>>>> > > > > > > >>>>> > > Has anyone tried this ? Any caveats. > > > > >>>>> > > > > > > >>>>> > > Thanks > > > > >>>>> > > Mohan > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > >>>>> > -- > > > > >>>>> > -- Guozhang > > > > >>>>> > > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > -- -- Guozhang