[ https://issues.apache.org/jira/browse/FLINK-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stefan Richter closed FLINK-10809. ---------------------------------- Resolution: Fixed Fix Version/s: 1.7.0 1.6.3 Merged in: master: bf760f9312 release-1.7: 45ad36fd75 release-1.6: 64c22cf245 > Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state > after restore > ------------------------------------------------------------------------------------------- > > Key: FLINK-10809 > URL: https://issues.apache.org/jira/browse/FLINK-10809 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing > Affects Versions: 1.7.0 > Reporter: Dawid Wysakowicz > Assignee: Stefan Richter > Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of > windowed aggregation: > {code} > DataStream<Tuple2<Integer, List<Event>>> eventStream4 = > eventStream2.keyBy(Event::getKey) > > .window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), > Time.milliseconds(150))) > .apply(new WindowFunction<Event, Tuple2<Integer, > List<Event>>, Integer, TimeWindow>() { > private static final long serialVersionUID = > 3166250579972849440L; > @Override > public void apply( > Integer key, TimeWindow window, > Iterable<Event> input, > Collector<Tuple2<Integer, List<Event>>> > out) throws Exception { > out.collect(Tuple2.of(key, > StreamSupport.stream(input.spliterator(), > false).collect(Collectors.toList()))); > } > }); > DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> > events.f0) > .flatMap(createSlidingWindowCheckMapper(pt)) > .addSink(new PrintSinkFunction<>()); > {code} > and then in the createSlidingWindowCheckMapper I verify that each event > belongs to 3 consecutive windows, for which I keep contents of last window in > ValueState. In a non-failure setup this check runs fine, but it misses few > windows after restore at the beginning. > {code} > public class SlidingWindowCheckMapper extends > RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> { > private static final long serialVersionUID = -744070793650644485L; > /** This value state tracks previously seen events with the number of > windows they appeared in. */ > private transient ValueState<List<Tuple2<Event, Integer>>> > previousWindow; > private final int slideFactor; > SlidingWindowCheckMapper(int slideFactor) { > this.slideFactor = slideFactor; > } > @Override > public void open(Configuration parameters) throws Exception { > ValueStateDescriptor<List<Tuple2<Event, Integer>>> > previousWindowDescriptor = > new ValueStateDescriptor<>("previousWindow", > new ListTypeInfo<>(new > TupleTypeInfo<>(TypeInformation.of(Event.class), > BasicTypeInfo.INT_TYPE_INFO))); > previousWindow = > getRuntimeContext().getState(previousWindowDescriptor); > } > @Override > public void flatMap(Tuple2<Integer, List<Event>> value, > Collector<String> out) throws Exception { > List<Tuple2<Event, Integer>> previousWindowValues = > Optional.ofNullable(previousWindow.value()).orElseGet( > Collections::emptyList); > List<Event> newValues = value.f1; > newValues.stream().reduce(new BinaryOperator<Event>() { > @Override > public Event apply(Event event, Event event2) { > if (event2.getSequenceNumber() - 1 != > event.getSequenceNumber()) { > out.collect("Alert: events in window > out ouf order!"); > } > return event2; > } > }); > List<Tuple2<Event, Integer>> newWindow = new ArrayList<>(); > for (Tuple2<Event, Integer> windowValue : previousWindowValues) > { > if (!newValues.contains(windowValue.f0)) { > out.collect(String.format("Alert: event %s did > not belong to %d consecutive windows. Event seen so far %d times.Current > window: %s", > windowValue.f0, > slideFactor, > windowValue.f1, > value.f1)); > } else { > newValues.remove(windowValue.f0); > if (windowValue.f1 + 1 != slideFactor) { > newWindow.add(Tuple2.of(windowValue.f0, > windowValue.f1 + 1)); > } > } > } > newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1))); > previousWindow.update(newWindow); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)