Hi community,

I was implementing the stream aggregation using Table API [1] and
trying out the local aggregation plan to optimize the query. Basically
I had to configure it like this:

Configuration configuration = tableEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setInteger("table.exec.resource.default-parallelism", 4);
// local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
configuration.setString("table.exec.mini-batch.size", "1000");
// enable two-phase, i.e. local-global aggregation
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

and when I saw the query plan on the dashboard I realized that the
LocalGroupAggregate is with parallelism 1 while the
GlobalGroupAggregate is with parallelism 4. Why was the
LocalGroupAggregate also with parallelism 4 since I set it on the
property ("table.exec.resource.default-parallelism"? Here is my code
[2].

Thanks,
Felipe

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
[2] 
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Reply via email to