Hi,

As a self training exercise I've defined a class extending WindowedStream
for implementing a proof of concept for a parallel version of
AllWindowStream

/**
 * Tries to create a parallel version of a AllWindowStream for a DataStream
 * by creating a KeyedStream by using as key the hash of the elements module
 * a parallelism level
 *
 * This only makes sense for window assigners that ensure the subwindows will be
 * in sync, like time based window assigners, and it is more stable
with ingestion
 * and event time because the window alignment is more reliable.
 * This doesn't work for counting or sessions window assigners.
 *
 * Also note elements from different partitions might get out of order due
 * to parallelism
 * */
public static class ParAllWindowedStream<T,W extends Window> extends
WindowedStream<T, Integer, W> {
    private final transient WindowAssigner<Object,W> windowAssigner;

    public ParAllWindowedStream(DataStream<T> stream, final int parallelism,
                                WindowAssigner<Object,W> windowAssigner) {
        super(stream.keyBy(new KeySelector<T, Integer>() {
                           @Override
                            public Integer getKey(T value) throws Exception {
                                return value.hashCode() % parallelism;
                            }
                        }),
              windowAssigner);
        this.windowAssigner = windowAssigner;
    }

    @Override
    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFun) {
        return super.reduce(reduceFun)      // reduce each subwindow
                .windowAll(windowAssigner)  // synchronize
                .reduce(reduceFun);         // sequential aggregate of
    }

    // Cannot override because we need an additional reduce function of type R
    // to recombine the result for each window
    // @Override
    public <R> SingleOutputStreamOperator<R>
applyPar(ReduceFunction<T> reduceFunction,
                                                    WindowFunction<T,
R, Integer, W> function,
                                                    ReduceFunction<R>
reduceWindowsFunction) {
        return super.apply(reduceFunction, function)
                    .windowAll(windowAssigner)
                     .reduce(reduceWindowsFunction);
    }
}

Maybe someone might find this interesting. I have a toy example program in
https://github.com/juanrh/flink-state-eviction/blob/05676ca0eebf83e936b5cc04ecf85e8110ccacf4/src/main/java/com/github/juanrh/streaming/windowAllPoCs/WindowAllTimeKeyedPoC.java
for the curious.

Greetings,

Juan

Reply via email to