Hello Mangat, I am using Kstreams, does this make a difference?
Anyways, I've read about stateful stream processing here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management I think I can manage to solve my issue using changelog topics for my state store. I'll use the KStream.transform() method, which does use changelogs, to get the same result. I'm going to try to change my implementation and I'll get back to you once I've done so. In the meantime, I wonder if you could tell me what are the differences between doing that and my previous implementation, which is storing my state in a topic from a KStream (KStream.to()) and then creating a GlobalKTable from that topic (StreamsBuilder.globalTable()) and querying its values using the local state store. I hope this makes any sense. Thanks in advance for your help, Urko On 2021/05/17 15:35:12, mangat rai <m...@gmail.com> wrote: > Urko,> > > You can enable changelog topics for your state store. This will enable the> > application to persist the data to a Kafka topic. Next time when> > application start, it will first build it's state by using this topic. Are> > you using Kstreams or the low-level processor API?> > > Regards,> > Mangat> > > On Mon, May 17, 2021 at 5:30 PM Urko Lekuona <ur...@arima.eu> wrote:> > > > Hello,> > >> > > I have a question regarding the use of Kafka/Kafka Streams to store the> > > state of a stateful application.> > >> > > My application is filtering on a stream based on a value from the previous> > > event of the stream. For example, if the previous car with the same model> > > was red, this car cannot be red. Previously, I was saving this state> > > in-memory (a map), where I could query the value of the previous event and> > > overwrite it if necessary.> > >> > > The problem with this implementation is that I lost the state if my> > > application crashed, which is something I can't afford. I thought about> > > storing it in a DB, but as I'm using Kafka, I've decided, and managed, to> > > store it there. Basically, I'm creating a GlobalKTable and using the> > > Streams facade (KafkaStreams.store) to query it's contents. This is> > > working, but I'm not familiarized with the Kafka environment enough to> > > understand it's implications.> > >> > > So that's why I come for help. Am I doing this right? Should or shouldn't I> > > do this?> > >> > > P.S. Message retention time shouldn't be an issue in my case, as this> > > information expires after a day, so as long as I can retain the events that> > > long, I'll be fine.> > >> > > Thanks in advance,> > >> > > Urko> > >> >