Thanks! On Mon, Mar 15, 2021 at 8:38 PM Sophie Blee-Goldman <sop...@confluent.io.invalid> wrote:
> Yep, that fell off my radar. Here we go: > https://issues.apache.org/jira/browse/KAFKA-12475 > > On Mon, Mar 15, 2021 at 8:09 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hey Sophie, > > > > Maybe we can first create a JIRA ticket for this? > > > > On Mon, Mar 15, 2021 at 3:09 PM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > Sounds good! I meant anyone who is interested :) > > > > > > Let me know if you have any questions after digging in to this > > > > > > On Mon, Mar 15, 2021 at 2:39 PM Alex Craig <alexcrai...@gmail.com> > > wrote: > > > > > > > Hey Sophie, not sure if you meant me or not but I'd be happy to > take a > > > > stab at creating a KIP for this. I want to spend some time digging > > into > > > > more of how this works first, but will then try to gather my thoughts > > and > > > > get something created. > > > > > > > > Alex > > > > > > > > On Mon, Mar 15, 2021 at 1:48 PM Sophie Blee-Goldman > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > This certainly does seem like a flaw in the Streams API, although > of > > > > course > > > > > Streams is just > > > > > in general not designed for use with remote anything (API calls, > > > stores, > > > > > etc) > > > > > > > > > > That said, I don't see any reason why we *couldn't* have better > > support > > > > for > > > > > remote state stores. > > > > > Note that there's currently no deleteAll method on the store > > interface, > > > > and > > > > > not all databases > > > > > necessarily support that. But we could add a default implementation > > > which > > > > > just calls delete(key) > > > > > on all keys in the state store, and for the RocksDB-based state > > stores > > > we > > > > > still wipe out the state > > > > > as usual (and recommend the same for any custom StateStore which is > > > local > > > > > rather than remote). > > > > > Obviously falling back on individual delete(key) operations for all > > the > > > > > data in the entire store will > > > > > have worse performance, but that's better than silently breaking > EOS > > > when > > > > > deleteAll is not available > > > > > on a remote store. > > > > > > > > > > Would you be interested in doing a KIP for this? We should also > file > > a > > > > JIRA > > > > > with the above explanation, > > > > > so that other users of remote storage are aware of this limitation. > > And > > > > > definitely document this somewhere > > > > > > > > > > > > > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna > > > <br...@confluent.io.invalid > > > > > > > > > > wrote: > > > > > > > > > > > Hi Alex, > > > > > > > > > > > > I guess wiping out the state directory is easier code-wise, > faster, > > > > > > and/or at the time of development the developers did not design > for > > > > > > remote state stores. But I do actually not know the exact reason. > > > > > > > > > > > > Off the top of my head, I do not know how to solve this for > remote > > > > state > > > > > > stores. Using the uncaught exception handler is not good, > because a > > > > > > computing node could fail without giving the JVM the opportunity > to > > > > > > throw an exception. > > > > > > > > > > > > In your tests, try to increase the commit interval to a high > value > > > and > > > > > > see if you get inconsistencies. You should get an inconsistency > if > > > the > > > > > > state store maintains counts for keys and after the last commit > > > before > > > > > > the failure, the Streams app puts an event with a new key K with > > > value > > > > 1 > > > > > > into the state store. After failover, Streams would put the same > > > event > > > > > > with key K again into the state store. If the state store deleted > > all > > > > of > > > > > > its data, Streams would put again value 1, but if the state store > > did > > > > > > not delete all data, Streams would put value 2 which is wrong > > because > > > > it > > > > > > would count the same event twice. > > > > > > > > > > > > Best, > > > > > > Bruno > > > > > > > > > > > > > > > > > > On 15.03.21 15:20, Alex Craig wrote: > > > > > > > Bruno, > > > > > > > Thanks for the info! that makes sense. Of course now I have > > more > > > > > > > questions. :) Do you know why this is being done outside of > the > > > > state > > > > > > > store API? I assume there are reasons why a "deleteAll()" type > > of > > > > > > function > > > > > > > wouldn't work, thereby allowing a state store to purge itself? > > And > > > > > maybe > > > > > > > more importantly, is there a way to achieve a similar behavior > > > with a > > > > > 3rd > > > > > > > party store? I'm not sure if hooking into the uncaught > exception > > > > > handler > > > > > > > might be a good way to purge/drop a state store in the event > of a > > > > fatal > > > > > > > error? I did setup a MongoDB state store recently as part of a > > POC > > > > and > > > > > > was > > > > > > > testing it with EOS enabled. (forcing crashes to occur and > > > checking > > > > > that > > > > > > > the result of my aggregation was still accurate) I was unable > to > > > > cause > > > > > > > inconsistent data in the mongo store (which is good!), though > of > > > > > course I > > > > > > > may just have been getting lucky. Thanks again for your help, > > > > > > > > > > > > > > Alex > > > > > > > > > > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole < > > > pdeole2...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > >> Bruno, > > > > > > >> > > > > > > >> i tried to explain this in 'kafka user's language through > above > > > > > > mentioned > > > > > > >> scenario, hope i put it properly -:) and correct me if i am > > wrong > > > > > > >> > > > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole < > > > pdeole2...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > >>> This is what I understand could be the issue with external > > state > > > > > store: > > > > > > >>> > > > > > > >>> kafka stream application consumes source topic, does > > processing, > > > > > stores > > > > > > >>> state to kafka state store (this is backed by topic) and > before > > > > > > producing > > > > > > >>> event on destination topic, the application fails with some > > > issue. > > > > In > > > > > > >>> this case, the transaction has failed, so kafka guarantees > > either > > > > all > > > > > > or > > > > > > >>> none, means offset written to source topic, state written to > > > state > > > > > > store > > > > > > >>> topic, output produced on destination topic... all of these > > > happen > > > > or > > > > > > >> none > > > > > > >>> of these and in this failure scenario it is none of these. > > > > > > >>> > > > > > > >>> Assume you have redis state store, and you updated the state > > into > > > > > redis > > > > > > >>> and stream application failed. Now, you have source topic and > > > > > > destination > > > > > > >>> topic consistent i.e. offset is not committed to source topic > > and > > > > > > output > > > > > > >>> not produced on destination topic, but you redis state store > is > > > > > > >>> inconsistent with that since it is external state store and > > kafka > > > > > can't > > > > > > >>> guarantee rollback ot state written there > > > > > > >>> > > > > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig < > > > alexcrai...@gmail.com> > > > > > > >> wrote: > > > > > > >>> > > > > > > >>>> " Another issue with 3rd party state stores could be > violation > > > of > > > > > > >>>> exactly-once guarantee provided by kafka streams in the > event > > > of a > > > > > > >> failure > > > > > > >>>> of streams application instance" > > > > > > >>>> > > > > > > >>>> I've heard this before but would love to know more about > how a > > > > > custom > > > > > > >>>> state > > > > > > >>>> store would be at any greater risk than RocksDB as far as > > > > > exactly-once > > > > > > >>>> guarantees are concerned. They all implement the same > > > interface, > > > > so > > > > > > as > > > > > > >>>> long as you're correctly implementing get(), put(), > delete(), > > > > > flush(), > > > > > > >>>> etc, > > > > > > >>>> you should be fine right? In other words, I don't think > there > > > is > > > > > any > > > > > > >>>> special "exactly once magic" that is baked into the RocksDB > > > store > > > > > > >> code. I > > > > > > >>>> could be wrong though so I'd love to hear people's thoughts, > > > > thanks, > > > > > > >>>> > > > > > > >>>> Alex C > > > > > > >>>> > > > > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan < > > > > > > mpart...@hpe.com> > > > > > > >>>> wrote: > > > > > > >>>> > > > > > > >>>>> Thanks for the responses. In the worst case, I might have > to > > > keep > > > > > > both > > > > > > >>>>> rocksdb for local store and keep an external store like > > Redis. > > > > > > >>>>> > > > > > > >>>>> -mohan > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" < > pdeole2...@gmail.com> > > > > > wrote: > > > > > > >>>>> > > > > > > >>>>> Another issue with 3rd party state stores could be > > > violation > > > > > of > > > > > > >>>>> exactly-once guarantee provided by kafka streams in > the > > > > event > > > > > > of a > > > > > > >>>>> failure > > > > > > >>>>> of streams application instance. > > > > > > >>>>> Kafka provides exactly once guarantee for consumer -> > > > > process > > > > > -> > > > > > > >>>>> produce > > > > > > >>>>> through transactions and with the use of state store > > like > > > > > redis, > > > > > > >>>> this > > > > > > >>>>> guarantee is weaker > > > > > > >>>>> > > > > > > >>>>> On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang < > > > > > > wangg...@gmail.com > > > > > > >>> > > > > > > >>>>> wrote: > > > > > > >>>>> > > > > > > >>>>> > Hello Mohan, > > > > > > >>>>> > > > > > > > >>>>> > I think what you had in mind works with Redis, since > > it > > > > is a > > > > > > >>>> remote > > > > > > >>>>> state > > > > > > >>>>> > store engine, it does not have the co-partitioning > > > > > > requirements > > > > > > >> as > > > > > > >>>>> local > > > > > > >>>>> > state stores. > > > > > > >>>>> > > > > > > > >>>>> > One thing you'd need to tune KS though is that with > > > remote > > > > > > >> stores, > > > > > > >>>>> the > > > > > > >>>>> > processing latency may be larger, and since Kafka > > > Streams > > > > > > >> process > > > > > > >>>> all > > > > > > >>>>> > records of a single partition in order, > synchronously, > > > you > > > > > may > > > > > > >>>> need > > > > > > >>>>> to tune > > > > > > >>>>> > the poll interval configs etc to make sure KS would > > stay > > > > in > > > > > > the > > > > > > >>>>> consumer > > > > > > >>>>> > group and not trigger unnecessary rebalances. > > > > > > >>>>> > > > > > > > >>>>> > Guozhang > > > > > > >>>>> > > > > > > > >>>>> > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, > Mohan < > > > > > > >>>>> mpart...@hpe.com> > > > > > > >>>>> > wrote: > > > > > > >>>>> > > > > > > > >>>>> > > Hi, > > > > > > >>>>> > > > > > > > > >>>>> > > I have a use case where messages come in with some > > key > > > > > gets > > > > > > >>>>> assigned some > > > > > > >>>>> > > partition and the state gets created. Later, key > > > changes > > > > > > (but > > > > > > >>>> still > > > > > > >>>>> > > contains the old key in the message) and gets sent > > to > > > a > > > > > > >>>> different > > > > > > >>>>> > > partition. I want to be able to grab the old state > > > using > > > > > the > > > > > > >> old > > > > > > >>>>> key > > > > > > >>>>> > before > > > > > > >>>>> > > creating the new state on this instance. Redis as > a > > > > state > > > > > > >> store > > > > > > >>>>> makes it > > > > > > >>>>> > > easy to implement this where I can simply do a > > lookup > > > > > before > > > > > > >>>>> creating the > > > > > > >>>>> > > state. I see an implementation here : > > > > > > >>>>> > > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > >>>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks > > > > > > >>>>> > > > > > > > > >>>>> > > Has anyone tried this ? Any caveats. > > > > > > >>>>> > > > > > > > > >>>>> > > Thanks > > > > > > >>>>> > > Mohan > > > > > > >>>>> > > > > > > > > >>>>> > > > > > > > > >>>>> > > > > > > > >>>>> > -- > > > > > > >>>>> > -- Guozhang > > > > > > >>>>> > > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang