Thanks Bruno.. yeah, I think I could figure it out... For dependencies such as database, for which all the events will be blocked, we are planning to put a retry mechanism, so processing will wait until the database connection is backup. If the problem is with the incoming event, like bad format etc. then we can skip that event and log it or add it to a dead letter queue topic.
On Wed, Sep 23, 2020 at 4:04 PM Bruno Cadonna <br...@confluent.io> wrote: > Hi Pushkar, > > if you do not want to lose any event, you should cache the events > somewhere (e.g. a state store) in case there is an issue with an > external system you connect to (e.g. database issue). If the order of > the event is important, you must ensure that the events in your cache > are processed in the order they where written to the cache (i.e. > first-in first-out). > > Maybe you can find some good hints in the links Gilles posted. > > Best, > Bruno > > On 22.09.20 10:51, Pushkar Deole wrote: > > Thank you Gilles..will take a look.. > > > > Bruno, thanks for your elaborate explanation as well... however it > > basically exposes my application to certain issues.. > > > > e.g. the application deals with agent states of a call center, and where > > the order of processing is important. So when agent is logged in then he > > keeps rotating between Ready, and Not ready states and at the end of the > > day he becomes Logged out... If while processing the Ready event, there > is > > some temporary issue with database/network and the event processing gets > > exception, application does few retries but no luck. > > As per kafka polling, it will go ahead and poll next record from > partition > > for the same agent (since agent id being key) and it will process logged > > out event. So, this mean i lost the Ready event in between due to the > > database issue? Even if i store this event somewhere for processing it > > later, processing the Ready event after logged out, doesn't make sense > > since order of state is important? Is my u > > > > On Tue, Sep 22, 2020 at 1:32 PM Gilles Philippart > > <gilles.philipp...@fundingcircle.com.invalid> wrote: > > > >> Hi Pushkar, > >> > >> Uber has written about how they deal with failures and reprocessing > here, > >> it might help you achieve what you describe: > >> https://eng.uber.com/reliable-reprocessing/. > >> > >> Unfortunately, there isn't much written documentation about those > patterns. > >> There's also a good talk from Confluent's Antony Stubbs on how you can > do > >> certain things with the Processor API that you can't do with the Kafka > >> Streams DSL: > >> > >> > https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api > >> . > >> > >> Gilles Philippart > >> Funding Circle Engineering > >> > >> On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna <br...@confluent.io> wrote: > >> > >>> Hi Pushkar, > >>> > >>> I think there is a misunderstanding. If a consumer polls from a > >>> partition, it will always poll the next event independently whether the > >>> offset was committed or not. Committed offsets are used for fault > >>> tolerance, i.e., when a consumer crashes, the consumer that takes over > >>> the work of the crashed consumer will start polling record from the > >>> offset the crashed consumer committed last. This is not only true for > >>> Kafka Streams, but for all applications that use a Kafka consumer with > >>> subscription. > >>> > >>> To be clear, my proposal is not a workaround. This is one approach to > >>> solve your problem in Kafka Streams. You could have a look into > >>> stream-stream joins if you can use a stream instead of a global table. > >>> Another approach would be to use a plain Kafka consumer instead of > Kafka > >>> Stream with which you would have a more fine-grained control about > polls > >>> and commits. In any case, be aware that blocking processing on an event > >>> indefinitely may result in your lag and/or your state growing > >>> indefinitely. > >>> > >>> If you think there is something missing in Kafka Streams, you are very > >>> welcome to search through the tickets in > >>> https://issues.apache.org/jira/projects/KAFKA/issues and comment on > >>> tickets that would solve your issue or create a new one if you cannot > >>> find any. > >>> > >>> Best, > >>> Bruno > >>> > >>> On 22.09.20 05:09, Pushkar Deole wrote: > >>>> Bruno, > >>>> > >>>> So, essentially, we are just waiting on the processing of first event > >>> that > >>>> got an error before going ahead on to the next one. > >>>> > >>>> Second, if application handles storing the events in state store for > >>> retry, > >>>> Kafka stream would essentially commit the offset of those events, so > >> next > >>>> event will be polled by consumer, correct? > >>>> > >>>> Instead of this work around, is there any provision in kafka streams > >> for > >>>> this scenario? e.g. in case application registers application level > >>>> exceptions then kafka streams will take care of it and do all this > >>>> internally, and will not commit the offset of that event and hence > will > >>>> keep polling the same event again? > >>>> Since this is a common scenario, using a particular configuration for > >>> users > >>>> can achieve this in Kafka streams internally? > >>>> > >>>> > >>>> On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna <br...@confluent.io> > >>> wrote: > >>>> > >>>>> Hi Pushkar, > >>>>> > >>>>> If you want to keep the order, you could still use the state store I > >>>>> suggested in my previous e-mail and implement a queue on top of it. > >> For > >>>>> that you need to put the events into the store with a key that > >>>>> represents the arrival order of the events. Each time a record is > >>>>> received from the input topic, the events are read in arrival order > >> from > >>>>> the state store and the data in the global table is probed. If an > >> event > >>>>> matches data from the global table the event is removed from the > state > >>>>> store and emitted. If an event does not match data from the global > >> table > >>>>> the processing is stopped and nothing is emitted. > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> On 21.09.20 14:21, Pushkar Deole wrote: > >>>>>> Bruno, > >>>>>> > >>>>>> 1. the loading of topic mapped to GlobalKTable is done by some other > >>>>>> service/application so when my application starts up, it will just > >>> sync a > >>>>>> GlobalKTable against that topic and if that other > service/application > >>> is > >>>>>> still starting up then it may not have loaded that data on that > topic > >>> and > >>>>>> that's the reason it is not available to my application through the > >>>>>> GlobalKTable. > >>>>>> > >>>>>> 2. I don't want out of order processing to happen, that's the reason > >> I > >>>>> want > >>>>>> my streams application to keep trying same event until the other > >>>>>> application starts up and required data becomes available in > >>> globalKtable > >>>>>> > >>>>>> > >>>>>> On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna <br...@confluent.io> > >>>>> wrote: > >>>>>> > >>>>>>> Thank you for clarifying! Now, I think I understand. > >>>>>>> > >>>>>>> You could put events for which required data in the global table is > >>> not > >>>>>>> available into a state store and each time an event from the input > >>> topic > >>>>>>> is processed, you could lookup all events in your state store and > >> see > >>> if > >>>>>>> required data is now available for them. > >>>>>>> > >>>>>>> However, be aware that this can mix up the original order of the > >>> events > >>>>>>> in your input topic if required data of later events is available > >>> before > >>>>>>> required data of earlier events. Furthermore, you need to consider > >> the > >>>>>>> case when you have a huge amount of events in the state store and > >>>>>>> suddenly all required data in the global table is available, > because > >>>>>>> processing all those events at once might lead to exceeding > >>>>>>> max.poll.interval.ms and the stream thread might be kicked out of > >> the > >>>>>>> consumer group. To solve that, you may want to limit the number of > >>>>>>> events processed at once. You also need to avoid the state store > >>> growing > >>>>>>> indefinitely if required data in the global table is not available > >>> for a > >>>>>>> long time or not available at all. Maybe all this caveats do not > >> apply > >>>>>>> to your use case. > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> > >>>>>>> On 21.09.20 13:45, Pushkar Deole wrote: > >>>>>>>> Say the application level exception is named as : > >>>>>>>> MeasureDefinitionNotAvaialbleException > >>>>>>>> > >>>>>>>> What I am trying to achieve is: in above case when the event > >>> processing > >>>>>>>> fails due to required data not available, the streams should not > >>>>> proceed > >>>>>>> on > >>>>>>>> to next event, however it should wait for the processing of > current > >>>>> event > >>>>>>>> to complete. If I just catch the > >>> MeasureDefinitionNotAvaialbleException > >>>>>>> in > >>>>>>>> processor and log it then the stream will proceed onto next event > >>>>>>>> considering the current event processing got successful right? > >>>>>>>> > >>>>>>>> On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole < > >> pdeole2...@gmail.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> It is not a kafka streams error, it is an application level error > >>> e.g. > >>>>>>>>> say, some data required for processing an input event is not > >>> available > >>>>>>> in > >>>>>>>>> the GlobalKTable since it is not yet synced with the global topic > >>>>>>>>> > >>>>>>>>> On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna < > br...@confluent.io > >>> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Pushkar, > >>>>>>>>>> > >>>>>>>>>> Is the error you are talking about, one that is thrown by Kafka > >>>>> Streams > >>>>>>>>>> or by your application? If it is thrown by Kafka Streams, could > >> you > >>>>>>>>>> please post the error? > >>>>>>>>>> > >>>>>>>>>> I do not completely understand what you are trying to achieve, > >> but > >>>>>>> maybe > >>>>>>>>>> max.task.idle.ms [1] is the configuration you are looking for. > >>>>>>>>>> > >>>>>>>>>> I can assure you that enable.auto.commit is false in Kafka > >> Streams. > >>>>>>> What > >>>>>>>>>> you probably mean is that Kafka Streams periodically commits the > >>>>>>>>>> offsets. The commit interval can be controlled with > >>>>> commit.interval.ms > >>>>>>>>>> [2]. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Bruno > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> [1] https://kafka.apache.org/documentation/#max.task.idle.ms > >>>>>>>>>> [2] https://kafka.apache.org/documentation/#commit.interval.ms > >>>>>>>>>> > >>>>>>>>>> On 21.09.20 12:38, Pushkar Deole wrote: > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> I would like to know how to handle following scenarios while > >>>>>>> processing > >>>>>>>>>>> events in a kafka streams application: > >>>>>>>>>>> > >>>>>>>>>>> 1. the streams application needs data from a globalKtable which > >>>>> loads > >>>>>>> it > >>>>>>>>>>> from a topic that is populated by some other > >> service/application. > >>>>> So, > >>>>>>> if > >>>>>>>>>>> the streams application starts getting events from input source > >>>>> topic > >>>>>>>>>>> however it doesn't find required data in GlobalKTable since > that > >>>>> other > >>>>>>>>>>> application/service hasn't yet loaded that data then the Kafka > >>>>> streams > >>>>>>>>>>> application gets error while processing the event and > >> application > >>>>>>>>>> handles > >>>>>>>>>>> the exception by logging an error and it goes onto processing > >>> other > >>>>>>>>>>> events. Since auto.commit is true, the polling will go on > >> fetching > >>>>>>> next > >>>>>>>>>>> batch and probably it will set the offset of previous batch, > >>> causing > >>>>>>>>>> loss > >>>>>>>>>>> of events that had an exception while processing. > >>>>>>>>>>> > >>>>>>>>>>> I want to halt the processing here if an error occurs while > >>>>> processing > >>>>>>>>>> the > >>>>>>>>>>> event, so instead of going on to the next event, the processing > >>>>> should > >>>>>>>>>> keep > >>>>>>>>>>> trying previous event until application level error is > resolved. > >>> How > >>>>>>>>>> can I > >>>>>>>>>>> achieve this? > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >> > >> -- > >> Gilles Philippart > >> *Originations - Principal Engineer* > >> _____ > >> t: +44 7498 544 150 <+44%7498%544%150> > >> e: gilles.philipp...@fundingcircle.com > >> a: 71 Queen Victoria Street, London. EC4V 4AY > >> < > >> > https://maps.google.com/?q=71+Queen+Victoria+Street,+London.+EC4V+4AY&entry=gmail&source=g > >>> > >> > >> Funding Circle Limited is authorised and regulated by the Financial > Conduct > >> Authority under firm registration number 722513. Funding Circle is not > >> covered by the Financial Services Compensation Scheme. Registered in > >> England (Co. No. 06968588) with registered office at 71 Queen Victoria > >> Street, London EC4V 4AY > >> < > >> > https://maps.google.com/?q=71+Queen+Victoria+Street,+London+EC4V+4AY&entry=gmail&source=g > >>> > >> . > >> > >> -- > >> > >> > >> > >> > >> Funding Circle Limited is authorised and regulated by the Financial > >> Conduct Authority under firm registration number 722513. Funding Circle > is > >> not covered by the Financial Services Compensation Scheme. Registered in > >> England (Co. No. 06968588) with registered office at 71 Queen Victoria > >> Street, London EC4V 4AY. > >> > > >