Couldn't you do that in one operator then? I mean doing the calls and caching 
the results?

> On 3. May 2018, at 12:28, Lasse Nedergaard <lassenederga...@gmail.com> wrote:
> 
> Hi. 
> 
> The idea is to cache the latest enrichment data to reuse them and thereby 
> limit the number of external enrichment calls a local cache in Flink as many 
> of our data objects are enriched with the same data. 
> An alternative solution could be to store the enriched data in Kafka and then 
> stream them into the Flink job that way but if I could do it inside Flink it 
> would be easier 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 3. maj 2018 kl. 12.09 skrev Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>>:
> 
>> Hi,
>> 
>> Why do you want to do the enrichment downstream and send the data back up? 
>> The problem is that feedback edges (or iterations, they are the same in 
>> Flink) have some issues with fault-tolerance. Could you maybe outline a bit 
>> more in-depths what you're doing and what the flow of data and enrichment is?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 2. May 2018, at 16:25, Lasse Nedergaard <lassenederga...@gmail.com 
>>> <mailto:lassenederga...@gmail.com>> wrote:
>>> 
>>> Hi. 
>>> 
>>> Because the data that I will cache come from a downstream operator and 
>>> iterations was the only way to look data back to a prev. Operator as I know 
>>> 
>>> Med venlig hilsen / Best regards
>>> Lasse Nedergaard
>>> 
>>> 
>>> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>>:
>>> 
>>>> Hi,
>>>> 
>>>> Why can not you use simple CoProcessFunction and handle cache updates 
>>>> within it’s processElement1 or processElement2 method?
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com 
>>>>> <mailto:lassenederga...@gmail.com>> wrote:
>>>>> 
>>>>> Hi.
>>>>> 
>>>>> I have a case where I have a input stream that I want to enrich with 
>>>>> external data. I want to cache some of the external lookup data to 
>>>>> improve the overall performances.
>>>>> To update my cache (a CoProcessFunction) I would use iteration to send 
>>>>> the external enriched information back to the cache and update a 
>>>>> mapstate. I use CoProcesFunction as the input stream and the enrich 
>>>>> stream contains 2 diff.object types and I don't want to mix them. 
>>>>> Because I use a ConnectedIterativeStream I can't use state in my 
>>>>> CoProcessFunction because the ConnectedIterativeStream create a 
>>>>> DataStream based on the Feedback signature and not the stream I close the 
>>>>> iteration with and it is not possible to provide a keySelector in the 
>>>>> withFeedbackType
>>>>> 
>>>>> Form Flink source
>>>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> 
>>>>> feedbackType, long waitTime) {
>>>>>     super(input.getExecutionEnvironment(), input, new 
>>>>> DataStream(input.getExecutionEnvironment(), new 
>>>>> CoFeedbackTransformation(input.getParallelism(), feedbackType, 
>>>>> waitTime)));
>>>>> }
>>>>> and both streams need to be keyed before state are assigned to the 
>>>>> operator.
>>>>> Any ideas how to workaround this problem?
>>>>> 
>>>>> My sudo code is as below.
>>>>> 
>>>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> 
>>>>> iteration = inputStream
>>>>>         .keyBy(obj -> obj.getkey))
>>>>>         
>>>>> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new 
>>>>> TypeHint<EnrichData>() {}));
>>>>> 
>>>>> DataStream<ReportMessageBase> enrichedStream = iteration
>>>>>         .process(new EnrichFromState());
>>>>> 
>>>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>>>>         .filter(obj -> obj.enriched);
>>>>>         
>>>>> EnrichService EnrichService = new EnrichService();
>>>>> DataStream<InputObject> enrichedFromApi = 
>>>>> EnrichService.parse(notEnrichedOutput);
>>>>> 
>>>>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>>>>         .map(obj -> {
>>>>> 
>>>>>             EnrichData newData =  new EnrichData();
>>>>>             newData.xx = obj.xx();
>>>>>             
>>>>>             return newData;
>>>>>         })
>>>>>         .keyBy(obj -> obj.getkey);
>>>>> 
>>>>> 
>>>>> iteration.closeWith(newAddresses);
>>>>> ....
>>>> 
>> 

Reply via email to