Brice Bingman created FLINK-6990: ------------------------------------ Summary: Poor performance with Sliding Time Windows Key: FLINK-6990 URL: https://issues.apache.org/jira/browse/FLINK-6990 Project: Flink Issue Type: Improvement Affects Versions: 1.3.0 Environment: OSX 10.11.4 2.8 GHz Intel Core i7 16 GB 1600 MHz DDR3 Reporter: Brice Bingman
I'm experiencing poor performance when using sliding time windows. Here is a simple example that performs poorly for me: {code:java} public class FlinkPerfTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); //Streaming 10,000 events per second see.addSource(new SourceFunction<TestObject>() { transient ScheduledExecutorService executor; @Override public synchronized void run(final SourceContext<TestObject> ctx) throws Exception { executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (int k = 0; k < 10; k++) { for (int i = 0; i < 1000; i++) { TestObject obj = new TestObject(); obj.setKey(k); ctx.collect(obj); } } } }, 0, 1, TimeUnit.SECONDS); this.wait(); } @Override public synchronized void cancel() { executor.shutdown(); this.notify(); } }).keyBy("key") .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, TimeWindow>() { @Override public void apply(Tuple key, TimeWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception { int count = 0; for (Object obj : input) { count++; } out.collect(key.getField(0) + ": " + count); } }) .print(); see.execute(); } public static class TestObject { private Integer key; public Integer getKey() { return key; } public void setKey(Integer key) { this.key = key; } } } {code} When running this, flink periodically pauses for long periods of time. I would expect a steady stream of output at 1 second intervals. For comparison, you can switch to a count window of similar size which peforms just fine: {code:java} .countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() { @Override public void apply(Tuple key, GlobalWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception { int count = 0; for (Object obj : input) { count++; } out.collect(key.getField(0) + ": " + count); } }) {code} I would expect the sliding time window to perform similarly to a count window. The sliding time window also uses significantly more cpu and memory than the count window. I would also expect resource consumption to be similar. A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock. There should be a lock per key or preferably a lock-less solution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)