leonardBang commented on a change in pull request #9525: [FLINK-13363][docs] 
Add documentation for streaming aggregate performance tuning
URL: https://github.com/apache/flink/pull/9525#discussion_r318880409
 
 

 ##########
 File path: docs/dev/table/tuning/streaming_aggregation_optimization.md
 ##########
 @@ -0,0 +1,271 @@
+---
+title: "Streaming Aggregation"
+nav-parent_id: tableapi_performance_tuning
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+SQL is the most widely used language for data analytics. Flink's Table API and 
SQL enables users to define efficient stream analytics applications in less 
time and effort. Moreover, Flink Table API and SQL is effectively optimized, it 
integrates a lot of query optimizations and tuned operator implementations. But 
not all of the optimizations are enabled by default, so for some workloads, it 
is possible to improve performance by turning on some options.
+
+In this page, we will introduce some useful optimization options and the 
internals of streaming aggregation which will bring great improvement in some 
cases.
+
+<span class="label label-danger">Attention</span> Currently, the optimization 
options mentioned in this page are only supported for the Blink planner.
+
+<span class="label label-danger">Attention</span> Currently, the streaming 
aggregation optimization are only supported for [unbounded-aggregations]({{ 
site.baseurl }}/dev/table/sql.html#aggregations). Optimizations for [window 
aggregations]({{ site.baseurl }}/dev/table/sql.html#group-windows) will be 
supported in the future.
+
+* This will be replaced by the TOC
+{:toc}
+
+By default, the unbounded aggregation operator processes input records one by 
one, i.e., (1) read accumulator from state, (2) accumulate/retract record to 
accumulator, (3) write accumulator back to state, (4) the next record will do 
the process again from (1). This processing pattern may increase the overhead 
of StateBackend (especially for RocksDB StateBackend).
+Besides, data skew which is very common in production will worsen the problem 
and make it easy for the jobs to be under backpressure situations.
+
+## MiniBatch Aggregation
+
+The core idea of mini-batch aggregation is caching a bundle of inputs in a 
buffer inside of the aggregation operator. When the bundle of inputs is 
triggered to process, only one operation per key to access state is needed. 
This can significantly reduce the state overhead and get a better throughput. 
However, this may increase some latency because it buffers some records instead 
of processing them in an instant. This is a trade-off between throughput and 
latency.
+
+The following figure explains how the mini-batch aggregation reduces state 
operations.
+
+<div style="text-align: center">
+  <img src="{{ site.baseurl }}/fig/table-streaming/minibatch_agg.png" 
width="50%" height="50%" />
+</div>
+
+MiniBatch optimization is disabled by default. In order to enable this 
optimization, you should set options `table.exec.mini-batch.enabled`, 
`table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please 
see [configuration]({{ site.baseurl }}/dev/table/config.html#execution-options) 
page for more details.
+
+The following examples show how to enable the option.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// instantiate table environment
+TableEnvironment tEnv = ...
+
+tEnv.getConfig()        // access high-level configuration
+  .getConfiguration()   // set low-level key-value options
+  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
+  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
+  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator tasks
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// instantiate table environment
+val tEnv: TableEnvironment = ...
+
+tEnv.getConfig         // access high-level configuration
+  .getConfiguration    // set low-level key-value options
+  .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch 
optimization
+  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
+  .setString("table.exec.mini-batch.size", "5000") // the maximum number of 
records can be buffered by each aggregate operator tasks
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# instantiate table environment
+t_env = ...
+
+t_env.get_config()        # access high-level configuration
+  .get_configuration()    # set low-level key-value options
+  .set_string("table.exec.mini-batch.enabled", "true") # enable mini-batch 
optimization
+  .set_string("table.exec.mini-batch.allow-latency", "5 s") # use 5 seconds to 
buffer input records
+  .set_string("table.exec.mini-batch.size", "5000"); # the maximum number of 
records can be buffered by each aggregate operator tasks
+{% endhighlight %}
+</div>
+</div>
+
+## Local-Global Aggregation
+
+Local-Global is proposed to solve data skew problem by dividing an group 
aggregation into two stages, that is doing local aggregation in upstream 
firstly, and followed by global aggregation in downstream, which is similar to 
Combine + Reduce pattern in MapReduce. For example, considering the following 
SQL:
+
+{% highlight sql %}
+SELECT color, sum(id)
+FROM T
+GROUP BY color
+{% endhighlight %}
+
+It is possible that the records in the data stream are skewed, thus some 
instances of aggregation operator have to process much more records than 
others, which leads to hotspot.
+The local aggregation can help to accumulate a certain amount of inputs which 
have the same key into a single accumulator. The global aggregate will only 
receive the reduced accumulators instead of large number of raw inputs.
 
 Review comment:
   "The global **aggregate** will" -> "The global **aggregation** will".
   keep same as mentioned above.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to