leonardBang commented on a change in pull request #9525: [FLINK-13363][docs] Add documentation for streaming aggregate performance tuning URL: https://github.com/apache/flink/pull/9525#discussion_r318866670
########## File path: docs/dev/table/tuning/streaming_aggregation_optimization.md ########## @@ -0,0 +1,271 @@ +--- +title: "Streaming Aggregation" +nav-parent_id: tableapi_performance_tuning +nav-pos: 10 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +SQL is the most widely used language for data analytics. Flink's Table API and SQL enables users to define efficient stream analytics applications in less time and effort. Moreover, Flink Table API and SQL is effectively optimized, it integrates a lot of query optimizations and tuned operator implementations. But not all of the optimizations are enabled by default, so for some workloads, it is possible to improve performance by turning on some options. + +In this page, we will introduce some useful optimization options and the internals of streaming aggregation which will bring great improvement in some cases. + +<span class="label label-danger">Attention</span> Currently, the optimization options mentioned in this page are only supported for the Blink planner. + +<span class="label label-danger">Attention</span> Currently, the streaming aggregation optimization are only supported for [unbounded-aggregations]({{ site.baseurl }}/dev/table/sql.html#aggregations). Optimizations for [window aggregations]({{ site.baseurl }}/dev/table/sql.html#group-windows) will be supported in the future. + +* This will be replaced by the TOC +{:toc} + +By default, the unbounded aggregation operator processes input records one by one, i.e., (1) read accumulator from state, (2) accumulate/retract record to accumulator, (3) write accumulator back to state, (4) the next record will do the process again from (1). This processing pattern may increase the overhead of StateBackend (especially for RocksDB StateBackend). +Besides, data skew which is very common in production will worsen the problem and make it easy for the jobs to be under backpressure situations. + +## MiniBatch Aggregation + +The core idea of mini-batch aggregation is caching a bundle of inputs in a buffer inside of the aggregation operator. When the bundle of inputs is triggered to process, only one operation per key to access state is needed. This can significantly reduce the state overhead and get a better throughput. However, this may increase some latency because it buffers some records instead of processing them in an instant. This is a trade-off between throughput and latency. + +The following figure explains how the mini-batch aggregation reduces state operations. + +<div style="text-align: center"> + <img src="{{ site.baseurl }}/fig/table-streaming/minibatch_agg.png" width="50%" height="50%" /> +</div> + +MiniBatch optimization is disabled by default. In order to enable this optimization, you should set options `table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please see [configuration]({{ site.baseurl }}/dev/table/config.html#execution-options) page for more details. + +The following examples show how to enable the option. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +// instantiate table environment +TableEnvironment tEnv = ... + +tEnv.getConfig() // access high-level configuration + .getConfiguration() // set low-level key-value options + .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization + .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records + .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator tasks Review comment: "each aggregate operator **task**" or "each **of** aggregate operator tasks" would be better ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services