I had a similar use case. What we did is that we decided that data for enrichment must be versioned, for example our enrichment data was "refreshed" once a day and we kept old data. During the enrichment process we lookup data for given version based on record's metadata.
Regards. Krzysztof Chmielewski śr., 5 paź 2022 o 10:25 Great Info <gubt...@gmail.com> napisał(a): > I have flink job and the current flow looks like below > > Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the > record)-*Sink1-Operator* & *Sink2-Operator * > > With this flow the current problem is at operator-2, the core logic runs > here is to lookup some reference status data from redis cache and enrich > the stream, this works fine if job runs well but recently I saw if job > failed at this operator or sink operators, entire jobs gets restarts and > stream gets repossessed from source, that causes different > reference status(if reference status in cache changes during this restart) > in enrichment, as per the business requirement I need to enrich with > reference status when stream received at my job. > > 1. Is there any way to just reprocess sink1,sink2 operators? > 2. How to just resume Sink2 during some cases like Sink-1 was > successful where Sink2 failed > > >