Hi Felipe,

The default value of `table.optimizer.agg-phase-strategy` is AUTO, if
mini-batch is enabled,
if will use TWO-PHASE, otherwise ONE-PHASE.

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy

On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <felipe.o.gutier...@gmail.com>
wrote:

> Hi Jack,
>
> I don't get the difference from the "MiniBatch Aggregation" if
> compared with the "Local-Global Aggregation". On the web page [1] it
> says that I have to enable the TWO_PHASE parameter. So I compared the
> query plan from both, with and without the TWO_PHASE parameter. And
> they are the same. So, I conclude that the mini-batch already is a
> TWO_PHASE strategy since it is already pre-aggregating locally. Is it
> correct?
>
> Here are both query plans:
> Thanks, Felipe
>
> Table API: mini-batch.enable                            : true
> Table API: distinct-agg.split.enabled                   : false
> Table API: parallelism                                  : 4
> Table API: mini-batch.latency                           : 1 s
> Table API: mini_batch.size                              : 1000
> Table API: mini_batch.two_phase                         : false
>
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: source",
>     "pact" : "Data Source",
>     "contents" : "Source: source",
>     "parallelism" : 4
>   }, {
>     "id" : 2,
>     "type" : "tokenizer",
>     "pact" : "Operator",
>     "contents" : "tokenizer",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 3,
>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "pact" : "Operator",
>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 4,
>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "pact" : "Operator",
>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 3,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 5,
>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(passengerCnt) AS count$0])",
>     "pact" : "Operator",
>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 4,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 7,
>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(count$0) AS EXPR$0])",
>     "pact" : "Operator",
>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 5,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 8,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 7,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 9,
>     "type" : "flat-output",
>     "pact" : "Operator",
>     "contents" : "flat-output",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 8,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Sink: sink",
>     "pact" : "Data Sink",
>     "contents" : "Sink: sink",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
>
> Table API: mini-batch.enable                            : true
> Table API: distinct-agg.split.enabled                   : false
> Table API: parallelism                                  : 4
> Table API: mini-batch.latency                           : 1 s
> Table API: mini_batch.size                              : 1000
> Table API: mini_batch.two_phase                         : true
>
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: source",
>     "pact" : "Data Source",
>     "contents" : "Source: source",
>     "parallelism" : 4
>   }, {
>     "id" : 2,
>     "type" : "tokenizer",
>     "pact" : "Operator",
>     "contents" : "tokenizer",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 3,
>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "pact" : "Operator",
>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 4,
>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "pact" : "Operator",
>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 3,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 5,
>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(passengerCnt) AS count$0])",
>     "pact" : "Operator",
>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 4,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 7,
>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(count$0) AS EXPR$0])",
>     "pact" : "Operator",
>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 5,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 8,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 7,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 9,
>     "type" : "flat-output",
>     "pact" : "Operator",
>     "contents" : "flat-output",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 8,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Sink: sink",
>     "pact" : "Data Sink",
>     "contents" : "Sink: sink",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
> <felipe.o.gutier...@gmail.com> wrote:
> >
> > I see, thanks Timo
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <twal...@apache.org> wrote:
> > >
> > > Hi Felipe,
> > >
> > > with non-deterministic Jark meant that you never know if the mini batch
> > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> > > execution. This depends how fast records arrive at the operator.
> > >
> > > In general, processing time can be considered non-deterministic,
> because
> > > 100ms must not be 100ms. This depends on the CPU load and other tasks
> > > such garbage collection etc. Only event-time (and thus event time
> > > windows) that work on the timestamp in the data instead of machine time
> > > is determistic,
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 10.11.20 12:02, Felipe Gutierrez wrote:
> > > > Hi Jark,
> > > >
> > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the
> query
> > > > and now the query plan is splitting into two hash partition phases.
> > > >
> > > > what do you mean by deterministic time? Why only the window aggregate
> > > > is deterministic? If I implement the ProcessingTimeCallback [1]
> > > > interface is it deterministic?
> > > >
> > > > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > > > Thanks
> > > >
> > > > --
> > > > -- Felipe Gutierrez
> > > > -- skype: felipe.o.gutierrez
> > > > -- https://felipeogutierrez.blogspot.com
> > > >
> > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <imj...@gmail.com> wrote:
> > > >>
> > > >> Hi Felipe,
> > > >>
> > > >> The "Split Distinct Aggregation", i.e. the
> "table.optimizer.distinct-agg.split.enabled" option,
> > > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> > > >>
> > > >> However, the query in your example is using COUNT(driverId).
> > > >> You can update it to COUNT(DISTINCT driverId), and it should show
> two hash phases.
> > > >>
> > > >> Regarding "MiniBatch Aggregation", it is not the same as a
> processing-time window aggregation.
> > > >>
> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it
> buffers some input records in memory
> > > >>   and processes them together to reduce the state accessing. But
> processing-time window is still a per-record
> > > >>   state accessing style. Besides, the local aggregation also
> applies mini-batch, it only sends the accumulator
> > > >>   of current this mini-batch to the downstream global aggregation,
> and this improves performance a lot.
> > > >> 2) The size of MiniBach is not deterministic. It may be triggered
> by the number of records or a timeout.
> > > >>    But a window aggregate is triggered by a deterministic time.
> > > >>
> > > >>
> > > >> Best,
> > > >> Jark
> > > >>
> > > >>
> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
> > > >>>
> > > >>> I realized that I forgot the image. Now it is attached.
> > > >>> --
> > > >>> -- Felipe Gutierrez
> > > >>> -- skype: felipe.o.gutierrez
> > > >>> -- https://felipeogutierrez.blogspot.com
> > > >>>
> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> > > >>> <felipe.o.gutier...@gmail.com> wrote:
> > > >>>>
> > > >>>> Hi community,
> > > >>>>
> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the
> taxi
> > > >>>> ride data set. My sql query from the table environment is the one
> > > >>>> below:
> > > >>>>
> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> > > >>>>
> > > >>>> and I am enableing:
> > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3
> s");
> > > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> > > >>>> configuration.setString("table.optimizer.agg-phase-strategy",
> "TWO_PHASE");
> > > >>>> and finally
> > > >>>>
> configuration.setString("table.optimizer.distinct-agg.split.enabled",
> "true");
> > > >>>>
> > > >>>> I was expecting that the query plan at the WEB UI show to me two
> hash
> > > >>>> phases as it is present here on the image [1]. Instead, it is
> showing
> > > >>>> to me the same plan with one hash phase as I was deploying only
> one
> > > >>>> Local aggregate and one Global aggregate (of course, taking the
> > > >>>> parallel instances into consideration). Please see the query
> execution
> > > >>>> plan image attached.
> > > >>>>
> > > >>>> Is there something that I am missing when I config the Table API?
> > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation"
> [2].
> > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time
> window
> > > >>>> on the operator after the hash phase? If it is, isn't it the same
> as a
> > > >>>> window aggregation instead of an unbounded window as the example
> > > >>>> presents?
> > > >>>>
> > > >>>> Thanks!
> > > >>>> Felipe
> > > >>>>
> > > >>>> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> > > >>>> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> > > >>>> --
> > > >>>> -- Felipe Gutierrez
> > > >>>> -- skype: felipe.o.gutierrez
> > > >>>> -- https://felipeogutierrez.blogspot.com
> > > >
> > >
>

Reply via email to