[ https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kostas Kloudas reassigned FLINK-4207: ------------------------------------- Assignee: Kostas Kloudas > WindowOperator becomes very slow with allowed lateness > ------------------------------------------------------ > > Key: FLINK-4207 > URL: https://issues.apache.org/jira/browse/FLINK-4207 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 1.1.0 > Reporter: Aljoscha Krettek > Assignee: Kostas Kloudas > Priority: Blocker > > In this simple example the throughput (as measured by the count the window > emits) becomes very low when an allowed lateness is set: > {code} > public class WindowWordCount { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > env.setParallelism(1); > env.addSource(new InfiniteTupleSource(100_000)) > .keyBy(0) > .timeWindow(Time.seconds(3)) > .allowedLateness(Time.seconds(1)) > .reduce(new ReduceFunction<Tuple2<String, > Integer>>() { > @Override > public Tuple2<String, Integer> > reduce(Tuple2<String, Integer> value1, > Tuple2<String, Integer> > value2) throws Exception { > return Tuple2.of(value1.f0, > value1.f1 + value2.f1); > } > }) > .filter(new FilterFunction<Tuple2<String, > Integer>>() { > private static final long > serialVersionUID = 1L; > @Override > public boolean filter(Tuple2<String, > Integer> value) throws Exception { > return > value.f0.startsWith("Tuple 0"); > } > }) > .print(); > // execute program > env.execute("WindowWordCount"); > } > public static class InfiniteTupleSource implements > ParallelSourceFunction<Tuple2<String, Integer>> { > private static final long serialVersionUID = 1L; > private int numGroups; > public InfiniteTupleSource(int numGroups) { > this.numGroups = numGroups; > } > @Override > public void run(SourceContext<Tuple2<String, Integer>> out) > throws Exception { > long index = 0; > while (true) { > Tuple2<String, Integer> tuple = new > Tuple2<>("Tuple " + (index % numGroups), 1); > out.collect(tuple); > index++; > } > } > @Override > public void cancel() { > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)