I had a similar use case.

What we did is that we decided that data for enrichment must be versioned,
for example our enrichment data was "refreshed" once a day and we kept old
data.
During the enrichment process we lookup data for given version based on
record's metadata.

Regards.
Krzysztof Chmielewski

śr., 5 paź 2022 o 10:25 Great Info <gubt...@gmail.com> napisał(a):

> I have flink job and the current flow looks like below
>
> Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the
> record)-*Sink1-Operator* & *Sink2-Operator *
>
> With this flow the current problem is at operator-2, the core logic runs
> here is to lookup some reference status data from redis cache and enrich
> the stream, this works fine if job runs well but recently I saw if job
> failed at this operator or sink operators, entire jobs gets restarts and
> stream gets repossessed from source, that causes different
> reference status(if reference status in cache changes during this restart)
> in enrichment, as per the business requirement I need to enrich with
> reference status when stream received at my job.
>
> 1. Is there any way to just reprocess sink1,sink2 operators?
> 2. How to just resume Sink2 during some cases like Sink-1 was
> successful where Sink2 failed
>
>
>

Reply via email to