I could but the external Rest call is done with async operator and I want to reduce the number of objects going to async and it would require that I store the state in the async operator to.
Med venlig hilsen / Best regards Lasse Nedergaard > Den 3. maj 2018 kl. 13.09 skrev Aljoscha Krettek <aljos...@apache.org>: > > Couldn't you do that in one operator then? I mean doing the calls and caching > the results? > >> On 3. May 2018, at 12:28, Lasse Nedergaard <lassenederga...@gmail.com> wrote: >> >> Hi. >> >> The idea is to cache the latest enrichment data to reuse them and thereby >> limit the number of external enrichment calls a local cache in Flink as many >> of our data objects are enriched with the same data. >> An alternative solution could be to store the enriched data in Kafka and >> then stream them into the Flink job that way but if I could do it inside >> Flink it would be easier >> >> Med venlig hilsen / Best regards >> Lasse Nedergaard >> >> >>> Den 3. maj 2018 kl. 12.09 skrev Aljoscha Krettek <aljos...@apache.org>: >>> >>> Hi, >>> >>> Why do you want to do the enrichment downstream and send the data back up? >>> The problem is that feedback edges (or iterations, they are the same in >>> Flink) have some issues with fault-tolerance. Could you maybe outline a bit >>> more in-depths what you're doing and what the flow of data and enrichment >>> is? >>> >>> Best, >>> Aljoscha >>> >>>> On 2. May 2018, at 16:25, Lasse Nedergaard <lassenederga...@gmail.com> >>>> wrote: >>>> >>>> Hi. >>>> >>>> Because the data that I will cache come from a downstream operator and >>>> iterations was the only way to look data back to a prev. Operator as I >>>> know >>>> >>>> Med venlig hilsen / Best regards >>>> Lasse Nedergaard >>>> >>>> >>>>> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com>: >>>>> >>>>> Hi, >>>>> >>>>> Why can not you use simple CoProcessFunction and handle cache updates >>>>> within it’s processElement1 or processElement2 method? >>>>> >>>>> Piotrek >>>>> >>>>>> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hi. >>>>>> >>>>>> I have a case where I have a input stream that I want to enrich with >>>>>> external data. I want to cache some of the external lookup data to >>>>>> improve the overall performances. >>>>>> To update my cache (a CoProcessFunction) I would use iteration to send >>>>>> the external enriched information back to the cache and update a >>>>>> mapstate. I use CoProcesFunction as the input stream and the enrich >>>>>> stream contains 2 diff.object types and I don't want to mix them. >>>>>> Because I use a ConnectedIterativeStream I can't use state in my >>>>>> CoProcessFunction because the ConnectedIterativeStream create a >>>>>> DataStream based on the Feedback signature and not the stream I close >>>>>> the iteration with and it is not possible to provide a keySelector in >>>>>> the withFeedbackType >>>>>> >>>>>> Form Flink source >>>>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> >>>>>> feedbackType, long waitTime) { >>>>>> super(input.getExecutionEnvironment(), input, new >>>>>> DataStream(input.getExecutionEnvironment(), new >>>>>> CoFeedbackTransformation(input.getParallelism(), feedbackType, >>>>>> waitTime))); >>>>>> } >>>>>> and both streams need to be keyed before state are assigned to the >>>>>> operator. >>>>>> Any ideas how to workaround this problem? >>>>>> >>>>>> My sudo code is as below. >>>>>> >>>>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> >>>>>> iteration = inputStream >>>>>> .keyBy(obj -> obj.getkey)) >>>>>> >>>>>> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new >>>>>> TypeHint<EnrichData>() {})); >>>>>> >>>>>> DataStream<ReportMessageBase> enrichedStream = iteration >>>>>> .process(new EnrichFromState()); >>>>>> >>>>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream >>>>>> .filter(obj -> obj.enriched); >>>>>> >>>>>> EnrichService EnrichService = new EnrichService(); >>>>>> DataStream<InputObject> enrichedFromApi = >>>>>> EnrichService.parse(notEnrichedOutput); >>>>>> >>>>>> DataStream<EnrichData> newEnrich = enrichedFromApi >>>>>> .map(obj -> { >>>>>> >>>>>> EnrichData newData = new EnrichData(); >>>>>> newData.xx = obj.xx(); >>>>>> >>>>>> return newData; >>>>>> }) >>>>>> .keyBy(obj -> obj.getkey); >>>>>> >>>>>> >>>>>> iteration.closeWith(newAddresses); >>>>>> .... >>>>> >>> >