I could but the external Rest call is done with async operator and I want to 
reduce the number of objects going to async and it would require that I store 
the state in the async operator to. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 3. maj 2018 kl. 13.09 skrev Aljoscha Krettek <aljos...@apache.org>:
> 
> Couldn't you do that in one operator then? I mean doing the calls and caching 
> the results?
> 
>> On 3. May 2018, at 12:28, Lasse Nedergaard <lassenederga...@gmail.com> wrote:
>> 
>> Hi. 
>> 
>> The idea is to cache the latest enrichment data to reuse them and thereby 
>> limit the number of external enrichment calls a local cache in Flink as many 
>> of our data objects are enriched with the same data. 
>> An alternative solution could be to store the enriched data in Kafka and 
>> then stream them into the Flink job that way but if I could do it inside 
>> Flink it would be easier 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>>> Den 3. maj 2018 kl. 12.09 skrev Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>> Hi,
>>> 
>>> Why do you want to do the enrichment downstream and send the data back up? 
>>> The problem is that feedback edges (or iterations, they are the same in 
>>> Flink) have some issues with fault-tolerance. Could you maybe outline a bit 
>>> more in-depths what you're doing and what the flow of data and enrichment 
>>> is?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 2. May 2018, at 16:25, Lasse Nedergaard <lassenederga...@gmail.com> 
>>>> wrote:
>>>> 
>>>> Hi. 
>>>> 
>>>> Because the data that I will cache come from a downstream operator and 
>>>> iterations was the only way to look data back to a prev. Operator as I 
>>>> know 
>>>> 
>>>> Med venlig hilsen / Best regards
>>>> Lasse Nedergaard
>>>> 
>>>> 
>>>>> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com>:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Why can not you use simple CoProcessFunction and handle cache updates 
>>>>> within it’s processElement1 or processElement2 method?
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com> 
>>>>>> wrote:
>>>>>> 
>>>>>> Hi.
>>>>>> 
>>>>>> I have a case where I have a input stream that I want to enrich with 
>>>>>> external data. I want to cache some of the external lookup data to 
>>>>>> improve the overall performances.
>>>>>> To update my cache (a CoProcessFunction) I would use iteration to send 
>>>>>> the external enriched information back to the cache and update a 
>>>>>> mapstate. I use CoProcesFunction as the input stream and the enrich 
>>>>>> stream contains 2 diff.object types and I don't want to mix them. 
>>>>>> Because I use a ConnectedIterativeStream I can't use state in my 
>>>>>> CoProcessFunction because the ConnectedIterativeStream create a 
>>>>>> DataStream based on the Feedback signature and not the stream I close 
>>>>>> the iteration with and it is not possible to provide a keySelector in 
>>>>>> the withFeedbackType
>>>>>> 
>>>>>> Form Flink source
>>>>>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> 
>>>>>> feedbackType, long waitTime) {
>>>>>>     super(input.getExecutionEnvironment(), input, new 
>>>>>> DataStream(input.getExecutionEnvironment(), new 
>>>>>> CoFeedbackTransformation(input.getParallelism(), feedbackType, 
>>>>>> waitTime)));
>>>>>> }
>>>>>> and both streams need to be keyed before state are assigned to the 
>>>>>> operator.
>>>>>> Any ideas how to workaround this problem?
>>>>>> 
>>>>>> My sudo code is as below.
>>>>>> 
>>>>>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> 
>>>>>> iteration = inputStream
>>>>>>         .keyBy(obj -> obj.getkey))
>>>>>>         
>>>>>> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new 
>>>>>> TypeHint<EnrichData>() {}));
>>>>>> 
>>>>>> DataStream<ReportMessageBase> enrichedStream = iteration
>>>>>>         .process(new EnrichFromState());
>>>>>> 
>>>>>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>>>>>         .filter(obj -> obj.enriched);
>>>>>>         
>>>>>> EnrichService EnrichService = new EnrichService();
>>>>>> DataStream<InputObject> enrichedFromApi = 
>>>>>> EnrichService.parse(notEnrichedOutput);
>>>>>> 
>>>>>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>>>>>         .map(obj -> {
>>>>>> 
>>>>>>             EnrichData newData =  new EnrichData();
>>>>>>             newData.xx = obj.xx();
>>>>>>             
>>>>>>             return newData;
>>>>>>         })
>>>>>>         .keyBy(obj -> obj.getkey);
>>>>>> 
>>>>>> 
>>>>>> iteration.closeWith(newAddresses);
>>>>>> ....
>>>>> 
>>> 
> 

Reply via email to