[ https://issues.apache.org/jira/browse/FLINK-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060530#comment-16060530 ]
Fabian Hueske commented on FLINK-6990: -------------------------------------- The poor performance has two reasons: 1) The implementation of time-based sliding windows in Flink. Flink treats each window individually and replicates records to each window. For a window of 10 minute size that slides by 1 second the data is replicated 600 fold (10 minutes / 1 second). 2) Your choice of using a WindowFunction instead of a ReduceFunction or AggregateFunction. A WindowFundtion requires to collect all elements and applies the function at the end of the window. If you implement this with a ReduceFunction (or AggregateFunction) the aggregation can be incrementally applied whenever a new record is assigned to a window. Consequently, the window only holds a single aggregated record instead of a list of all records. Count-based sliding windows are differently implemented and avoid the replication of records. However, they cannot leverage the eager aggregation of a ReduceFunction and apply the function at the end of a window similar to a WindowFunction. > Poor performance with Sliding Time Windows > ------------------------------------------ > > Key: FLINK-6990 > URL: https://issues.apache.org/jira/browse/FLINK-6990 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.3.0 > Environment: OSX 10.11.4 > 2.8 GHz Intel Core i7 > 16 GB 1600 MHz DDR3 > Reporter: Brice Bingman > > I'm experiencing poor performance when using sliding time windows. Here is a > simple example that performs poorly for me: > {code:java} > public class FlinkPerfTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > //Streaming 10,000 events per second > see.addSource(new SourceFunction<TestObject>() { > transient ScheduledExecutorService executor; > @Override > public synchronized void run(final SourceContext<TestObject> ctx) > throws Exception { > executor = Executors.newSingleThreadScheduledExecutor(); > executor.scheduleAtFixedRate(new Runnable() { > @Override > public void run() { > for (int k = 0; k < 10; k++) { > for (int i = 0; i < 1000; i++) { > TestObject obj = new TestObject(); > obj.setKey(k); > ctx.collect(obj); > } > } > } > }, 0, 1, TimeUnit.SECONDS); > this.wait(); > } > @Override > public synchronized void cancel() { > executor.shutdown(); > this.notify(); > } > }).keyBy("key") > .window(SlidingProcessingTimeWindows.of(Time.minutes(10), > Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, > TimeWindow>() { > @Override > public void apply(Tuple key, TimeWindow window, > Iterable<TestObject> input, Collector<String> out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > .print(); > see.execute(); > } > public static class TestObject { > private Integer key; > public Integer getKey() { > return key; > } > public void setKey(Integer key) { > this.key = key; > } > } > } > {code} > When running this, flink periodically pauses for long periods of time. I > would expect a steady stream of output at 1 second intervals. For > comparison, you can switch to a count window of similar size which peforms > just fine: > {code:java} > .countWindow(600000, 1000).apply(new > WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() { > @Override > public void apply(Tuple key, GlobalWindow window, > Iterable<TestObject> input, Collector<String> out) throws Exception { > int count = 0; > for (Object obj : input) { > count++; > } > out.collect(key.getField(0) + ": " + count); > } > }) > {code} > I would expect the sliding time window to perform similarly to a count > window. The sliding time window also uses significantly more cpu and memory > than the count window. I would also expect resource consumption to be > similar. > A possible cause could be that the SystemProcessingTimeService.TriggerTask is > locking with the checkpointLock which acts like a global lock. There should > be a lock per key or preferably a lock-less solution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)