leonardBang commented on a change in pull request #9525: [FLINK-13363][docs] 
Add documentation for streaming aggregate performance tuning
URL: https://github.com/apache/flink/pull/9525#discussion_r318866670
 
 

 ##########
 File path: docs/dev/table/tuning/streaming_aggregation_optimization.md
 ##########
 @@ -0,0 +1,271 @@
+---
+title: "Streaming Aggregation"
+nav-parent_id: tableapi_performance_tuning
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+SQL is the most widely used language for data analytics. Flink's Table API and 
SQL enables users to define efficient stream analytics applications in less 
time and effort. Moreover, Flink Table API and SQL is effectively optimized, it 
integrates a lot of query optimizations and tuned operator implementations. But 
not all of the optimizations are enabled by default, so for some workloads, it 
is possible to improve performance by turning on some options.
+
+In this page, we will introduce some useful optimization options and the 
internals of streaming aggregation which will bring great improvement in some 
cases.
+
+<span class="label label-danger">Attention</span> Currently, the optimization 
options mentioned in this page are only supported for the Blink planner.
+
+<span class="label label-danger">Attention</span> Currently, the streaming 
aggregation optimization are only supported for [unbounded-aggregations]({{ 
site.baseurl }}/dev/table/sql.html#aggregations). Optimizations for [window 
aggregations]({{ site.baseurl }}/dev/table/sql.html#group-windows) will be 
supported in the future.
+
+* This will be replaced by the TOC
+{:toc}
+
+By default, the unbounded aggregation operator processes input records one by 
one, i.e., (1) read accumulator from state, (2) accumulate/retract record to 
accumulator, (3) write accumulator back to state, (4) the next record will do 
the process again from (1). This processing pattern may increase the overhead 
of StateBackend (especially for RocksDB StateBackend).
+Besides, data skew which is very common in production will worsen the problem 
and make it easy for the jobs to be under backpressure situations.
+
+## MiniBatch Aggregation
+
+The core idea of mini-batch aggregation is caching a bundle of inputs in a 
buffer inside of the aggregation operator. When the bundle of inputs is 
triggered to process, only one operation per key to access state is needed. 
This can significantly reduce the state overhead and get a better throughput. 
However, this may increase some latency because it buffers some records instead 
of processing them in an instant. This is a trade-off between throughput and 
latency.
+
+The following figure explains how the mini-batch aggregation reduces state 
operations.
+
+<div style="text-align: center">
+  <img src="{{ site.baseurl }}/fig/table-streaming/minibatch_agg.png" 
width="50%" height="50%" />
+</div>
+
+MiniBatch optimization is disabled by default. In order to enable this 
optimization, you should set options `table.exec.mini-batch.enabled`, 
`table.exec.mini-batch.allow-latency` and `table.exec.mini-batch.size`. Please 
see [configuration]({{ site.baseurl }}/dev/table/config.html#execution-options) 
page for more details.
+
+The following examples show how to enable the option.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// instantiate table environment
+TableEnvironment tEnv = ...
+
+tEnv.getConfig()        // access high-level configuration
+  .getConfiguration()   // set low-level key-value options
+  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
+  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
+  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator tasks
 
 Review comment:
   "each aggregate operator **task**" or "each **of** aggregate operator tasks" 
 would be better

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to