Hi,

Why can not you use simple CoProcessFunction and handle cache updates within 
it’s processElement1 or processElement2 method?

Piotrek

> On 1 May 2018, at 10:20, Lasse Nedergaard <lassenederga...@gmail.com> wrote:
> 
> Hi.
> 
> I have a case where I have a input stream that I want to enrich with external 
> data. I want to cache some of the external lookup data to improve the overall 
> performances.
> To update my cache (a CoProcessFunction) I would use iteration to send the 
> external enriched information back to the cache and update a mapstate. I use 
> CoProcesFunction as the input stream and the enrich stream contains 2 
> diff.object types and I don't want to mix them. 
> Because I use a ConnectedIterativeStream I can't use state in my 
> CoProcessFunction because the ConnectedIterativeStream create a DataStream 
> based on the Feedback signature and not the stream I close the iteration with 
> and it is not possible to provide a keySelector in the withFeedbackType
> 
> Form Flink source
> public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> 
> feedbackType, long waitTime) {
>     super(input.getExecutionEnvironment(), input, new 
> DataStream(input.getExecutionEnvironment(), new 
> CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
> }
> and both streams need to be keyed before state are assigned to the operator.
> Any ideas how to workaround this problem?
> 
> My sudo code is as below.
> 
> IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration 
> = inputStream
>         .keyBy(obj -> obj.getkey))
>         
> .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new 
> TypeHint<EnrichData>() {}));
> 
> DataStream<ReportMessageBase> enrichedStream = iteration
>         .process(new EnrichFromState());
> 
> DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
>         .filter(obj -> obj.enriched);
>         
> EnrichService EnrichService = new EnrichService();
> DataStream<InputObject> enrichedFromApi = 
> EnrichService.parse(notEnrichedOutput);
> 
> DataStream<EnrichData> newEnrich = enrichedFromApi
>         .map(obj -> {
> 
>             EnrichData newData =  new EnrichData();
>             newData.xx = obj.xx();
>             
>             return newData;
>         })
>         .keyBy(obj -> obj.getkey);
> 
> 
> iteration.closeWith(newAddresses);
> ....

Reply via email to