Hi. I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances. To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them. Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
Form Flink source public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) { super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime))); } and both streams need to be keyed before state are assigned to the operator. Any ideas how to workaround this problem? My sudo code is as below. IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream .keyBy(obj -> obj.getkey)) .iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {})); DataStream<ReportMessageBase> enrichedStream = iteration .process(new EnrichFromState()); DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream .filter(obj -> obj.enriched); EnrichService EnrichService = new EnrichService(); DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput); DataStream<EnrichData> newEnrich = enrichedFromApi .map(obj -> { EnrichData newData = new EnrichData(); newData.xx = obj.xx(); return newData; }) .keyBy(obj -> obj.*getkey*); iteration.closeWith(newAddresses); ....