[ https://issues.apache.org/jira/browse/FLINK-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683358#comment-16683358 ]
ASF GitHub Bot commented on FLINK-10809: ---------------------------------------- tzulitai commented on a change in pull request #7048: [FLINK-10809][state] Include keyed state that is not from head operat… URL: https://github.com/apache/flink/pull/7048#discussion_r232551351 ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java ########## @@ -156,15 +165,22 @@ public void invoke(Tuple2<Integer, Integer> value, Context context) throws Excep } } - private static class FromPartitionFileSource extends RichParallelSourceFunction<Tuple2<Integer, Integer>> { + private static class FromPartitionFileSource extends RichParallelSourceFunction<Tuple2<Integer, Integer>> + implements CheckpointedFunction, CheckpointListener { Review comment: Needs proper indentation ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state > after restore > ------------------------------------------------------------------------------------------- > > Key: FLINK-10809 > URL: https://issues.apache.org/jira/browse/FLINK-10809 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing > Affects Versions: 1.7.0 > Reporter: Dawid Wysakowicz > Assignee: Stefan Richter > Priority: Major > Labels: pull-request-available > > I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of > windowed aggregation: > {code} > DataStream<Tuple2<Integer, List<Event>>> eventStream4 = > eventStream2.keyBy(Event::getKey) > > .window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), > Time.milliseconds(150))) > .apply(new WindowFunction<Event, Tuple2<Integer, > List<Event>>, Integer, TimeWindow>() { > private static final long serialVersionUID = > 3166250579972849440L; > @Override > public void apply( > Integer key, TimeWindow window, > Iterable<Event> input, > Collector<Tuple2<Integer, List<Event>>> > out) throws Exception { > out.collect(Tuple2.of(key, > StreamSupport.stream(input.spliterator(), > false).collect(Collectors.toList()))); > } > }); > DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> > events.f0) > .flatMap(createSlidingWindowCheckMapper(pt)) > .addSink(new PrintSinkFunction<>()); > {code} > and then in the createSlidingWindowCheckMapper I verify that each event > belongs to 3 consecutive windows, for which I keep contents of last window in > ValueState. In a non-failure setup this check runs fine, but it misses few > windows after restore at the beginning. > {code} > public class SlidingWindowCheckMapper extends > RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> { > private static final long serialVersionUID = -744070793650644485L; > /** This value state tracks previously seen events with the number of > windows they appeared in. */ > private transient ValueState<List<Tuple2<Event, Integer>>> > previousWindow; > private final int slideFactor; > SlidingWindowCheckMapper(int slideFactor) { > this.slideFactor = slideFactor; > } > @Override > public void open(Configuration parameters) throws Exception { > ValueStateDescriptor<List<Tuple2<Event, Integer>>> > previousWindowDescriptor = > new ValueStateDescriptor<>("previousWindow", > new ListTypeInfo<>(new > TupleTypeInfo<>(TypeInformation.of(Event.class), > BasicTypeInfo.INT_TYPE_INFO))); > previousWindow = > getRuntimeContext().getState(previousWindowDescriptor); > } > @Override > public void flatMap(Tuple2<Integer, List<Event>> value, > Collector<String> out) throws Exception { > List<Tuple2<Event, Integer>> previousWindowValues = > Optional.ofNullable(previousWindow.value()).orElseGet( > Collections::emptyList); > List<Event> newValues = value.f1; > newValues.stream().reduce(new BinaryOperator<Event>() { > @Override > public Event apply(Event event, Event event2) { > if (event2.getSequenceNumber() - 1 != > event.getSequenceNumber()) { > out.collect("Alert: events in window > out ouf order!"); > } > return event2; > } > }); > List<Tuple2<Event, Integer>> newWindow = new ArrayList<>(); > for (Tuple2<Event, Integer> windowValue : previousWindowValues) > { > if (!newValues.contains(windowValue.f0)) { > out.collect(String.format("Alert: event %s did > not belong to %d consecutive windows. Event seen so far %d times.Current > window: %s", > windowValue.f0, > slideFactor, > windowValue.f1, > value.f1)); > } else { > newValues.remove(windowValue.f0); > if (windowValue.f1 + 1 != slideFactor) { > newWindow.add(Tuple2.of(windowValue.f0, > windowValue.f1 + 1)); > } > } > } > newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1))); > previousWindow.update(newWindow); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)