Hi. 

Because the data that I will cache come from a downstream operator and 
iterations was the only way to look data back to a prev. Operator as I know 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 2. maj 2018 kl. 15.35 skrev Piotr Nowojski <pi...@data-artisans.com>:
> 
> Hi,
> 
> Why can not you use simple CoProcessFunction and handle cache updates within 
> it’s processElement1 or processElement2 method?
> 
> Piotrek
> 
>> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com> wrote:
>> 
>> Hi.
>> 
>> I have a case where I have a input stream that I want to enrich with 
>> external data. I want to cache some of the external lookup data to improve 
>> the overall performances.
>> To update my cache (a CoProcessFunction) I would use iteration to send the 
>> external enriched information back to the cache and update a mapstate. I use 
>> CoProcesFunction as the input stream and the enrich stream contains 2 
>> diff.object types and I don't want to mix them. 
>> Because I use a ConnectedIterativeStream I can't use state in my 
>> CoProcessFunction because the ConnectedIterativeStream create a DataStream 
>> based on the Feedback signature and not the stream I close the iteration 
>> with and it is not possible to provide a keySelector in the withFeedbackType
>> 
>> Form Flink source
>> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> 
>> feedbackType, long waitTime) {
>>     super(input.getExecutionEnvironment(), input, new 
>> DataStream(input.getExecutionEnvironment(), new 
>> CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
>> }
>> and both streams need to be keyed before state are assigned to the operator.
>> Any ideas how to workaround this problem?
>> 
>> My sudo code is as below.
>> 
>> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration 
>> = inputStream
>>         .keyBy(obj -> obj.getkey))
>>         
>> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new 
>> TypeHint<EnrichData>() {}));
>> 
>> DataStream<ReportMessageBase> enrichedStream = iteration
>>         .process(new EnrichFromState());
>> 
>> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>>         .filter(obj -> obj.enriched);
>>         
>> EnrichService EnrichService = new EnrichService();
>> DataStream<InputObject> enrichedFromApi = 
>> EnrichService.parse(notEnrichedOutput);
>> 
>> DataStream<EnrichData> newEnrich = enrichedFromApi
>>         .map(obj -> {
>> 
>>             EnrichData newData =  new EnrichData();
>>             newData.xx = obj.xx();
>>             
>>             return newData;
>>         })
>>         .keyBy(obj -> obj.getkey);
>> 
>> 
>> iteration.closeWith(newAddresses);
>> ....
> 

Reply via email to