I see. now it has different query plans. It was documented on another page so I got confused. Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
On Thu, Nov 12, 2020 at 12:41 PM Jark Wu <imj...@gmail.com> wrote: > > Hi Felipe, > > The default value of `table.optimizer.agg-phase-strategy` is AUTO, if > mini-batch is enabled, > if will use TWO-PHASE, otherwise ONE-PHASE. > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy > > On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <felipe.o.gutier...@gmail.com> > wrote: >> >> Hi Jack, >> >> I don't get the difference from the "MiniBatch Aggregation" if >> compared with the "Local-Global Aggregation". On the web page [1] it >> says that I have to enable the TWO_PHASE parameter. So I compared the >> query plan from both, with and without the TWO_PHASE parameter. And >> they are the same. So, I conclude that the mini-batch already is a >> TWO_PHASE strategy since it is already pre-aggregating locally. Is it >> correct? >> >> Here are both query plans: >> Thanks, Felipe >> >> Table API: mini-batch.enable : true >> Table API: distinct-agg.split.enabled : false >> Table API: parallelism : 4 >> Table API: mini-batch.latency : 1 s >> Table API: mini_batch.size : 1000 >> Table API: mini_batch.two_phase : false >> >> { >> "nodes" : [ { >> "id" : 1, >> "type" : "Source: source", >> "pact" : "Data Source", >> "contents" : "Source: source", >> "parallelism" : 4 >> }, { >> "id" : 2, >> "type" : "tokenizer", >> "pact" : "Operator", >> "contents" : "tokenizer", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 1, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 3, >> "type" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "pact" : "Operator", >> "contents" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 2, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 4, >> "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "pact" : "Operator", >> "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 3, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 5, >> "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(passengerCnt) AS count$0])", >> "pact" : "Operator", >> "contents" : "LocalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(passengerCnt) AS count$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 4, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 7, >> "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(count$0) AS EXPR$0])", >> "pact" : "Operator", >> "contents" : "GlobalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(count$0) AS EXPR$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 5, >> "ship_strategy" : "HASH", >> "side" : "second" >> } ] >> }, { >> "id" : 8, >> "type" : "SinkConversionToTuple2", >> "pact" : "Operator", >> "contents" : "SinkConversionToTuple2", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 7, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 9, >> "type" : "flat-output", >> "pact" : "Operator", >> "contents" : "flat-output", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 8, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 10, >> "type" : "Sink: sink", >> "pact" : "Data Sink", >> "contents" : "Sink: sink", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 9, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> } ] >> } >> >> Table API: mini-batch.enable : true >> Table API: distinct-agg.split.enabled : false >> Table API: parallelism : 4 >> Table API: mini-batch.latency : 1 s >> Table API: mini_batch.size : 1000 >> Table API: mini_batch.two_phase : true >> >> { >> "nodes" : [ { >> "id" : 1, >> "type" : "Source: source", >> "pact" : "Data Source", >> "contents" : "Source: source", >> "parallelism" : 4 >> }, { >> "id" : 2, >> "type" : "tokenizer", >> "pact" : "Operator", >> "contents" : "tokenizer", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 1, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 3, >> "type" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "pact" : "Operator", >> "contents" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 2, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 4, >> "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "pact" : "Operator", >> "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 3, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 5, >> "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(passengerCnt) AS count$0])", >> "pact" : "Operator", >> "contents" : "LocalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(passengerCnt) AS count$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 4, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 7, >> "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(count$0) AS EXPR$0])", >> "pact" : "Operator", >> "contents" : "GlobalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(count$0) AS EXPR$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 5, >> "ship_strategy" : "HASH", >> "side" : "second" >> } ] >> }, { >> "id" : 8, >> "type" : "SinkConversionToTuple2", >> "pact" : "Operator", >> "contents" : "SinkConversionToTuple2", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 7, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 9, >> "type" : "flat-output", >> "pact" : "Operator", >> "contents" : "flat-output", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 8, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 10, >> "type" : "Sink: sink", >> "pact" : "Data Sink", >> "contents" : "Sink: sink", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 9, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> } ] >> } >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html >> >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com >> >> On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > I see, thanks Timo >> > >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > -- https://felipeogutierrez.blogspot.com >> > >> > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <twal...@apache.org> wrote: >> > > >> > > Hi Felipe, >> > > >> > > with non-deterministic Jark meant that you never know if the mini batch >> > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the >> > > execution. This depends how fast records arrive at the operator. >> > > >> > > In general, processing time can be considered non-deterministic, because >> > > 100ms must not be 100ms. This depends on the CPU load and other tasks >> > > such garbage collection etc. Only event-time (and thus event time >> > > windows) that work on the timestamp in the data instead of machine time >> > > is determistic, >> > > >> > > Regards, >> > > Timo >> > > >> > > >> > > On 10.11.20 12:02, Felipe Gutierrez wrote: >> > > > Hi Jark, >> > > > >> > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query >> > > > and now the query plan is splitting into two hash partition phases. >> > > > >> > > > what do you mean by deterministic time? Why only the window aggregate >> > > > is deterministic? If I implement the ProcessingTimeCallback [1] >> > > > interface is it deterministic? >> > > > >> > > > [1] >> > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html >> > > > Thanks >> > > > >> > > > -- >> > > > -- Felipe Gutierrez >> > > > -- skype: felipe.o.gutierrez >> > > > -- https://felipeogutierrez.blogspot.com >> > > > >> > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <imj...@gmail.com> wrote: >> > > >> >> > > >> Hi Felipe, >> > > >> >> > > >> The "Split Distinct Aggregation", i.e. the >> > > >> "table.optimizer.distinct-agg.split.enabled" option, >> > > >> only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). >> > > >> >> > > >> However, the query in your example is using COUNT(driverId). >> > > >> You can update it to COUNT(DISTINCT driverId), and it should show two >> > > >> hash phases. >> > > >> >> > > >> Regarding "MiniBatch Aggregation", it is not the same as a >> > > >> processing-time window aggregation. >> > > >> >> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it >> > > >> buffers some input records in memory >> > > >> and processes them together to reduce the state accessing. But >> > > >> processing-time window is still a per-record >> > > >> state accessing style. Besides, the local aggregation also applies >> > > >> mini-batch, it only sends the accumulator >> > > >> of current this mini-batch to the downstream global aggregation, >> > > >> and this improves performance a lot. >> > > >> 2) The size of MiniBach is not deterministic. It may be triggered by >> > > >> the number of records or a timeout. >> > > >> But a window aggregate is triggered by a deterministic time. >> > > >> >> > > >> >> > > >> Best, >> > > >> Jark >> > > >> >> > > >> >> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez >> > > >> <felipe.o.gutier...@gmail.com> wrote: >> > > >>> >> > > >>> I realized that I forgot the image. Now it is attached. >> > > >>> -- >> > > >>> -- Felipe Gutierrez >> > > >>> -- skype: felipe.o.gutierrez >> > > >>> -- https://felipeogutierrez.blogspot.com >> > > >>> >> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez >> > > >>> <felipe.o.gutier...@gmail.com> wrote: >> > > >>>> >> > > >>>> Hi community, >> > > >>>> >> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi >> > > >>>> ride data set. My sql query from the table environment is the one >> > > >>>> below: >> > > >>>> >> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, >> > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate"); >> > > >>>> >> > > >>>> and I am enableing: >> > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true"); >> > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 >> > > >>>> s"); >> > > >>>> configuration.setString("table.exec.mini-batch.size", "5000"); >> > > >>>> configuration.setString("table.optimizer.agg-phase-strategy", >> > > >>>> "TWO_PHASE"); >> > > >>>> and finally >> > > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", >> > > >>>> "true"); >> > > >>>> >> > > >>>> I was expecting that the query plan at the WEB UI show to me two >> > > >>>> hash >> > > >>>> phases as it is present here on the image [1]. Instead, it is >> > > >>>> showing >> > > >>>> to me the same plan with one hash phase as I was deploying only one >> > > >>>> Local aggregate and one Global aggregate (of course, taking the >> > > >>>> parallel instances into consideration). Please see the query >> > > >>>> execution >> > > >>>> plan image attached. >> > > >>>> >> > > >>>> Is there something that I am missing when I config the Table API? >> > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" >> > > >>>> [2]. >> > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time >> > > >>>> window >> > > >>>> on the operator after the hash phase? If it is, isn't it the same >> > > >>>> as a >> > > >>>> window aggregation instead of an unbounded window as the example >> > > >>>> presents? >> > > >>>> >> > > >>>> Thanks! >> > > >>>> Felipe >> > > >>>> >> > > >>>> [1] >> > > >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation >> > > >>>> [2] >> > > >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >> > > >>>> -- >> > > >>>> -- Felipe Gutierrez >> > > >>>> -- skype: felipe.o.gutierrez >> > > >>>> -- https://felipeogutierrez.blogspot.com >> > > > >> > >