Github user ktzoumas commented on a diff in the pull request: https://github.com/apache/flink/pull/1215#discussion_r41125422 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -24,49 +24,169 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; -import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamGroupedFold; import org.apache.flink.streaming.api.operators.StreamGroupedReduce; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.AbstractTime; +import org.apache.flink.streaming.api.windowing.time.EventTime; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; /** - * A GroupedDataStream represents a {@link DataStream} which has been - * partitioned by the given {@link KeySelector}. Operators like {@link #reduce}, - * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to - * get additional functionality by the grouping. + * A {@code KeyedStream} represents a {@link DataStream} on which operator state is + * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a + * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of + * partitioning methods such as shuffle, forward and keyBy. * - * @param <T> The type of the elements in the Grouped Stream. + * <p> + * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements + * that have the same key. + * + * @param <T> The type of the elements in the Keyed Stream. * @param <KEY> The type of the key in the Keyed Stream. */ -public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> { +public class KeyedStream<T, KEY> extends DataStream<T> { + + protected final KeySelector<T, KEY> keySelector; + + /** + * Creates a new {@link KeyedStream} using the given {@link KeySelector} + * to partition operator state by key. + * + * @param dataStream + * Base stream of data + * @param keySelector + * Function for determining state partitions + */ + public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) { + super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector))); + this.keySelector = keySelector; + } + + + public KeySelector<T, KEY> getKeySelector() { + return this.keySelector; + } + + + @Override + protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) { + throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream."); + } + + + @Override + public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, + TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { + + SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator); + + ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector); + return returnStream; + } + + + + @Override + public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { + DataStreamSink<T> result = super.addSink(sinkFunction); + result.getTransformation().setStateKeySelector(keySelector); + return result; + } + + // ------------------------------------------------------------------------ + // Windowing + // ------------------------------------------------------------------------ /** - * Creates a new {@link GroupedDataStream}, group inclusion is determined using - * a {@link KeySelector} on the elements of the {@link DataStream}. + * Windows this {@code KeyedStream} into tumbling time windows. * - * @param dataStream Base stream of data - * @param keySelector Function for determining group inclusion + * <p> + * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or + * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic + * set using + * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)} + * + * @param size The size of the window. + */ + public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) { + AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); + + if (actualSize instanceof EventTime) { + return window(TumblingTimeWindows.of(actualSize)); + } else { + return window(TumblingProcessingTimeWindows.of(actualSize)); + } + } + + /** + * Windows this {@code KeyedStream} into sliding time windows. + * + * <p> + * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or + * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic + * set using + * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)} + * + * @param size The size of the window. */ - public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) { - super(dataStream, keySelector); + public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) { + AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); + AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic()); + + if (actualSize instanceof EventTime) { + return window(SlidingTimeWindows.of(actualSize, actualSlide)); + } else { + return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide)); + } + } + + /** + * Windows this data stream to a {@code WindowedStream}, which evaluates windows + * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The + * grouping of elements is done both by key and by window. + * + * <p> + * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify + * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger} + * that is used if a {@code Trigger} is not specified. + * + * @param assigner The {@code WindowAssigner} that assigns elements to windows. + * @return The trigger windows data stream. + */ + public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { + return new WindowedStream<>(this, assigner); } + // ------------------------------------------------------------------------ + // Non-Windowed aggregation operations + // ------------------------------------------------------------------------ /** * Applies a reduce transformation on the grouped data stream grouped on by * the given key position. The {@link ReduceFunction} will receive input * values based on the key value. Only input values with the same key will * go to the same reducer. - * + * * @param reducer * The {@link ReduceFunction} that will be called for every * element of the input values with the same key. * @return The transformed DataStream. */ public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) { - return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>( - clean(reducer), keySelector)); + return transform("Grouped Reduce", getType(), new StreamGroupedReduce<>(clean(reducer), keySelector)); --- End diff -- "Grouped Reduce" or simply "Reduce"?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---