I see. now it has different query plans. It was documented on another
page so I got confused. Thanks!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, Nov 12, 2020 at 12:41 PM Jark Wu <imj...@gmail.com> wrote:
>
> Hi Felipe,
>
> The default value of `table.optimizer.agg-phase-strategy` is AUTO, if 
> mini-batch is enabled,
> if will use TWO-PHASE, otherwise ONE-PHASE.
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy
>
> On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <felipe.o.gutier...@gmail.com> 
> wrote:
>>
>> Hi Jack,
>>
>> I don't get the difference from the "MiniBatch Aggregation" if
>> compared with the "Local-Global Aggregation". On the web page [1] it
>> says that I have to enable the TWO_PHASE parameter. So I compared the
>> query plan from both, with and without the TWO_PHASE parameter. And
>> they are the same. So, I conclude that the mini-batch already is a
>> TWO_PHASE strategy since it is already pre-aggregating locally. Is it
>> correct?
>>
>> Here are both query plans:
>> Thanks, Felipe
>>
>> Table API: mini-batch.enable                            : true
>> Table API: distinct-agg.split.enabled                   : false
>> Table API: parallelism                                  : 4
>> Table API: mini-batch.latency                           : 1 s
>> Table API: mini_batch.size                              : 1000
>> Table API: mini_batch.two_phase                         : false
>>
>> {
>>   "nodes" : [ {
>>     "id" : 1,
>>     "type" : "Source: source",
>>     "pact" : "Data Source",
>>     "contents" : "Source: source",
>>     "parallelism" : 4
>>   }, {
>>     "id" : 2,
>>     "type" : "tokenizer",
>>     "pact" : "Operator",
>>     "contents" : "tokenizer",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 1,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 3,
>>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "pact" : "Operator",
>>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 2,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 4,
>>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "pact" : "Operator",
>>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 3,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 5,
>>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(passengerCnt) AS count$0])",
>>     "pact" : "Operator",
>>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 4,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 7,
>>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(count$0) AS EXPR$0])",
>>     "pact" : "Operator",
>>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 5,
>>       "ship_strategy" : "HASH",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 8,
>>     "type" : "SinkConversionToTuple2",
>>     "pact" : "Operator",
>>     "contents" : "SinkConversionToTuple2",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 7,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 9,
>>     "type" : "flat-output",
>>     "pact" : "Operator",
>>     "contents" : "flat-output",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 8,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 10,
>>     "type" : "Sink: sink",
>>     "pact" : "Data Sink",
>>     "contents" : "Sink: sink",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 9,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   } ]
>> }
>>
>> Table API: mini-batch.enable                            : true
>> Table API: distinct-agg.split.enabled                   : false
>> Table API: parallelism                                  : 4
>> Table API: mini-batch.latency                           : 1 s
>> Table API: mini_batch.size                              : 1000
>> Table API: mini_batch.two_phase                         : true
>>
>> {
>>   "nodes" : [ {
>>     "id" : 1,
>>     "type" : "Source: source",
>>     "pact" : "Data Source",
>>     "contents" : "Source: source",
>>     "parallelism" : 4
>>   }, {
>>     "id" : 2,
>>     "type" : "tokenizer",
>>     "pact" : "Operator",
>>     "contents" : "tokenizer",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 1,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 3,
>>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "pact" : "Operator",
>>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 2,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 4,
>>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "pact" : "Operator",
>>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 3,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 5,
>>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(passengerCnt) AS count$0])",
>>     "pact" : "Operator",
>>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 4,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 7,
>>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(count$0) AS EXPR$0])",
>>     "pact" : "Operator",
>>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 5,
>>       "ship_strategy" : "HASH",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 8,
>>     "type" : "SinkConversionToTuple2",
>>     "pact" : "Operator",
>>     "contents" : "SinkConversionToTuple2",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 7,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 9,
>>     "type" : "flat-output",
>>     "pact" : "Operator",
>>     "contents" : "flat-output",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 8,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 10,
>>     "type" : "Sink: sink",
>>     "pact" : "Data Sink",
>>     "contents" : "Sink: sink",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 9,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   } ]
>> }
>>
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>>
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
>> <felipe.o.gutier...@gmail.com> wrote:
>> >
>> > I see, thanks Timo
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com
>> >
>> > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <twal...@apache.org> wrote:
>> > >
>> > > Hi Felipe,
>> > >
>> > > with non-deterministic Jark meant that you never know if the mini batch
>> > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
>> > > execution. This depends how fast records arrive at the operator.
>> > >
>> > > In general, processing time can be considered non-deterministic, because
>> > > 100ms must not be 100ms. This depends on the CPU load and other tasks
>> > > such garbage collection etc. Only event-time (and thus event time
>> > > windows) that work on the timestamp in the data instead of machine time
>> > > is determistic,
>> > >
>> > > Regards,
>> > > Timo
>> > >
>> > >
>> > > On 10.11.20 12:02, Felipe Gutierrez wrote:
>> > > > Hi Jark,
>> > > >
>> > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
>> > > > and now the query plan is splitting into two hash partition phases.
>> > > >
>> > > > what do you mean by deterministic time? Why only the window aggregate
>> > > > is deterministic? If I implement the ProcessingTimeCallback [1]
>> > > > interface is it deterministic?
>> > > >
>> > > > [1] 
>> > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
>> > > > Thanks
>> > > >
>> > > > --
>> > > > -- Felipe Gutierrez
>> > > > -- skype: felipe.o.gutierrez
>> > > > -- https://felipeogutierrez.blogspot.com
>> > > >
>> > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <imj...@gmail.com> wrote:
>> > > >>
>> > > >> Hi Felipe,
>> > > >>
>> > > >> The "Split Distinct Aggregation", i.e. the 
>> > > >> "table.optimizer.distinct-agg.split.enabled" option,
>> > > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>> > > >>
>> > > >> However, the query in your example is using COUNT(driverId).
>> > > >> You can update it to COUNT(DISTINCT driverId), and it should show two 
>> > > >> hash phases.
>> > > >>
>> > > >> Regarding "MiniBatch Aggregation", it is not the same as a 
>> > > >> processing-time window aggregation.
>> > > >>
>> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it 
>> > > >> buffers some input records in memory
>> > > >>   and processes them together to reduce the state accessing. But 
>> > > >> processing-time window is still a per-record
>> > > >>   state accessing style. Besides, the local aggregation also applies 
>> > > >> mini-batch, it only sends the accumulator
>> > > >>   of current this mini-batch to the downstream global aggregation, 
>> > > >> and this improves performance a lot.
>> > > >> 2) The size of MiniBach is not deterministic. It may be triggered by 
>> > > >> the number of records or a timeout.
>> > > >>    But a window aggregate is triggered by a deterministic time.
>> > > >>
>> > > >>
>> > > >> Best,
>> > > >> Jark
>> > > >>
>> > > >>
>> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez 
>> > > >> <felipe.o.gutier...@gmail.com> wrote:
>> > > >>>
>> > > >>> I realized that I forgot the image. Now it is attached.
>> > > >>> --
>> > > >>> -- Felipe Gutierrez
>> > > >>> -- skype: felipe.o.gutierrez
>> > > >>> -- https://felipeogutierrez.blogspot.com
>> > > >>>
>> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>> > > >>> <felipe.o.gutier...@gmail.com> wrote:
>> > > >>>>
>> > > >>>> Hi community,
>> > > >>>>
>> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>> > > >>>> ride data set. My sql query from the table environment is the one
>> > > >>>> below:
>> > > >>>>
>> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>> > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>> > > >>>>
>> > > >>>> and I am enableing:
>> > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
>> > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 
>> > > >>>> s");
>> > > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
>> > > >>>> configuration.setString("table.optimizer.agg-phase-strategy", 
>> > > >>>> "TWO_PHASE");
>> > > >>>> and finally
>> > > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled",
>> > > >>>>  "true");
>> > > >>>>
>> > > >>>> I was expecting that the query plan at the WEB UI show to me two 
>> > > >>>> hash
>> > > >>>> phases as it is present here on the image [1]. Instead, it is 
>> > > >>>> showing
>> > > >>>> to me the same plan with one hash phase as I was deploying only one
>> > > >>>> Local aggregate and one Global aggregate (of course, taking the
>> > > >>>> parallel instances into consideration). Please see the query 
>> > > >>>> execution
>> > > >>>> plan image attached.
>> > > >>>>
>> > > >>>> Is there something that I am missing when I config the Table API?
>> > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" 
>> > > >>>> [2].
>> > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time 
>> > > >>>> window
>> > > >>>> on the operator after the hash phase? If it is, isn't it the same 
>> > > >>>> as a
>> > > >>>> window aggregation instead of an unbounded window as the example
>> > > >>>> presents?
>> > > >>>>
>> > > >>>> Thanks!
>> > > >>>> Felipe
>> > > >>>>
>> > > >>>> [1] 
>> > > >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>> > > >>>> [2] 
>> > > >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>> > > >>>> --
>> > > >>>> -- Felipe Gutierrez
>> > > >>>> -- skype: felipe.o.gutierrez
>> > > >>>> -- https://felipeogutierrez.blogspot.com
>> > > >
>> > >

Reply via email to