Hi community,
I was implementing the stream aggregation using Table API [1] and
trying out the local aggregation plan to optimize the query. Basically
I had to configure it like this:
Configuration configuration = tableEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setInteger("table.exec.resource.default-parallelism", 4);
// local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
configuration.setString("table.exec.mini-batch.size", "1000");
// enable two-phase, i.e. local-global aggregation
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
and when I saw the query plan on the dashboard I realized that the
LocalGroupAggregate is with parallelism 1 while the
GlobalGroupAggregate is with parallelism 4. Why was the
LocalGroupAggregate also with parallelism 4 since I set it on the
property ("table.exec.resource.default-parallelism"? Here is my code
[2].
Thanks,
Felipe
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com