Couldn't you do that in one operator then? I mean doing the calls and caching the results?
> On 3. May 2018, at 12:28, Lasse Nedergaard <lassenederga...@gmail.com> wrote: > > Hi. > > The idea is to cache the latest enrichment data to reuse them and thereby > limit the number of external enrichment calls a local cache in Flink as many > of our data objects are enriched with the same data. > An alternative solution could be to store the enriched data in Kafka and then > stream them into the Flink job that way but if I could do it inside Flink it > would be easier > > Med venlig hilsen / Best regards > Lasse Nedergaard > > > Den 3. maj 2018 kl. 12.09 skrev Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>>: > >> Hi, >> >> Why do you want to do the enrichment downstream and send the data back up? >> The problem is that feedback edges (or iterations, they are the same in >> Flink) have some issues with fault-tolerance. Could you maybe outline a bit >> more in-depths what you're doing and what the flow of data and enrichment is? >> >> Best, >> Aljoscha >> >>> On 2. May 2018, at 16:25, Lasse Nedergaard <lassenederga...@gmail.com >>> <mailto:lassenederga...@gmail.com>> wrote: >>> >>> Hi. >>> >>> Because the data that I will cache come from a downstream operator and >>> iterations was the only way to look data back to a prev. Operator as I know >>> >>> Med venlig hilsen / Best regards >>> Lasse Nedergaard >>> >>> >>> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com >>> <mailto:pi...@data-artisans.com>>: >>> >>>> Hi, >>>> >>>> Why can not you use simple CoProcessFunction and handle cache updates >>>> within it’s processElement1 or processElement2 method? >>>> >>>> Piotrek >>>> >>>>> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com >>>>> <mailto:lassenederga...@gmail.com>> wrote: >>>>> >>>>> Hi. >>>>> >>>>> I have a case where I have a input stream that I want to enrich with >>>>> external data. I want to cache some of the external lookup data to >>>>> improve the overall performances. >>>>> To update my cache (a CoProcessFunction) I would use iteration to send >>>>> the external enriched information back to the cache and update a >>>>> mapstate. I use CoProcesFunction as the input stream and the enrich >>>>> stream contains 2 diff.object types and I don't want to mix them. >>>>> Because I use a ConnectedIterativeStream I can't use state in my >>>>> CoProcessFunction because the ConnectedIterativeStream create a >>>>> DataStream based on the Feedback signature and not the stream I close the >>>>> iteration with and it is not possible to provide a keySelector in the >>>>> withFeedbackType >>>>> >>>>> Form Flink source >>>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> >>>>> feedbackType, long waitTime) { >>>>> super(input.getExecutionEnvironment(), input, new >>>>> DataStream(input.getExecutionEnvironment(), new >>>>> CoFeedbackTransformation(input.getParallelism(), feedbackType, >>>>> waitTime))); >>>>> } >>>>> and both streams need to be keyed before state are assigned to the >>>>> operator. >>>>> Any ideas how to workaround this problem? >>>>> >>>>> My sudo code is as below. >>>>> >>>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> >>>>> iteration = inputStream >>>>> .keyBy(obj -> obj.getkey)) >>>>> >>>>> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new >>>>> TypeHint<EnrichData>() {})); >>>>> >>>>> DataStream<ReportMessageBase> enrichedStream = iteration >>>>> .process(new EnrichFromState()); >>>>> >>>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream >>>>> .filter(obj -> obj.enriched); >>>>> >>>>> EnrichService EnrichService = new EnrichService(); >>>>> DataStream<InputObject> enrichedFromApi = >>>>> EnrichService.parse(notEnrichedOutput); >>>>> >>>>> DataStream<EnrichData> newEnrich = enrichedFromApi >>>>> .map(obj -> { >>>>> >>>>> EnrichData newData = new EnrichData(); >>>>> newData.xx = obj.xx(); >>>>> >>>>> return newData; >>>>> }) >>>>> .keyBy(obj -> obj.getkey); >>>>> >>>>> >>>>> iteration.closeWith(newAddresses); >>>>> .... >>>> >>