Hi community, I was implementing the stream aggregation using Table API [1] and trying out the local aggregation plan to optimize the query. Basically I had to configure it like this:
Configuration configuration = tableEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setInteger("table.exec.resource.default-parallelism", 4); // local-global aggregation depends on mini-batch is enabled configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "1 s"); configuration.setString("table.exec.mini-batch.size", "1000"); // enable two-phase, i.e. local-global aggregation configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); and when I saw the query plan on the dashboard I realized that the LocalGroupAggregate is with parallelism 1 while the GlobalGroupAggregate is with parallelism 4. Why was the LocalGroupAggregate also with parallelism 4 since I set it on the property ("table.exec.resource.default-parallelism"? Here is my code [2]. Thanks, Felipe [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html [2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com