Hi community, I am testing the "Split Distinct Aggregation" [1] consuming the taxi ride data set. My sql query from the table environment is the one below:
Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, COUNT(driverId) FROM TaxiRide GROUP BY startDate"); and I am enableing: configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); configuration.setString("table.exec.mini-batch.size", "5000"); configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); and finally configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); I was expecting that the query plan at the WEB UI show to me two hash phases as it is present here on the image [1]. Instead, it is showing to me the same plan with one hash phase as I was deploying only one Local aggregate and one Global aggregate (of course, taking the parallel instances into consideration). Please see the query execution plan image attached. Is there something that I am missing when I config the Table API? By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. Is the "MiniBatch Aggregation" aggregating as a processing time window on the operator after the hash phase? If it is, isn't it the same as a window aggregation instead of an unbounded window as the example presents? Thanks! Felipe [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com