[ https://issues.apache.org/jira/browse/FLINK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-20909. --------------------------- Resolution: Fixed Fixed in master: d1d78e72220e2938bf5af77420af16c477c96e29 > MiniBatch Interval derivation does not work well when enable miniBatch > optimization in a job which contains deduplicate on row and unbounded > aggregate. > ------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-20909 > URL: https://issues.apache.org/jira/browse/FLINK-20909 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Affects Versions: 1.12.0 > Reporter: Andy > Assignee: Andy > Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > MiniBatch Interval derivation does not work well when enable miniBatch > optimization in a job which contains deduplicate on row and unbounded > aggregate. > {code:java} > @Test > def testLastRowOnRowtime1(): Unit = { > val t = env.fromCollection(rowtimeTestData) > .assignTimestampsAndWatermarks(new RowtimeExtractor) > .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime()) > tEnv.registerTable("T", t) > tEnv.executeSql( > s""" > |CREATE TABLE rowtime_sink ( > | cnt BIGINT > |) WITH ( > | 'connector' = 'values', > | 'sink-insert-only' = 'false', > | 'changelog-mode' = 'I,UA,D' > |) > |""".stripMargin) > val sql = > """ > |INSERT INTO rowtime_sink > |SELECT COUNT(b) FROM ( > | SELECT a, b, c, rowtime > | FROM ( > | SELECT *, > | ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum > | FROM T > | ) > | WHERE rowNum = 1 > | ) > """.stripMargin > tEnv.executeSql(sql).await() > val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink") > }{code} > E.g for the above sql, when enable MiniBatch optimization, the optimized plan > is as following. > {code:java} > Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0]) > +- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0]) > +- Exchange(distribution=[single]) > +- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, > COUNT_RETRACT(*) AS count1$1]) > +- Calc(select=[b]) > +- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME]) > +- Exchange(distribution=[hash[b]]) > +- Calc(select=[b, rowtime]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c, rowtime]){code} > A `StreamExecMiniBatchAssigner` will be inserted. The behavior is weird > because `Deduplicate` depends on rowTime, however > `ProcTimeMiniBatchAssignerOperator` will send watermark every specified > interval second depends on process time. For `Deduplicate`, the incoming > watermark does not relate to rowTime of incoming record, it cannot indicate > rowTime of all following input records are all larger than or equals to the > current incoming watermark. > -- This message was sent by Atlassian Jira (v8.3.4#803005)