[ https://issues.apache.org/jira/browse/FLINK-15421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu resolved FLINK-15421. ----------------------------- Resolution: Fixed 1.11.0: ba4433540561ef942062c70eb6bce64c02d8a54a 1.10.0: f58a2ecf2c6a60c0c81f9ece13d58797407232fa 1.9.2: ecd4e42d4980928655ec3ba2f1517d12c29a1d94 > GroupAggsHandler throws java.time.LocalDateTime cannot be cast to > java.sql.Timestamp > ------------------------------------------------------------------------------------ > > Key: FLINK-15421 > URL: https://issues.apache.org/jira/browse/FLINK-15421 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.9.1, 1.10.0 > Reporter: Benchao Li > Assignee: Zhenghua Gao > Priority: Critical > Labels: pull-request-available > Fix For: 1.9.2, 1.10.0 > > Time Spent: 40m > Remaining Estimate: 0h > > `TimestmapType` has two types of physical representation: `Timestamp` and > `LocalDateTime`. When we use following SQL, it will conflict each other: > {code:java} > SELECT > SUM(cnt) as s, > MAX(ts) > FROM > SELECT > `string`, > `int`, > COUNT(*) AS cnt, > MAX(rowtime) as ts > FROM T1 > GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND) > GROUP BY `string` > {code} > with 'table.exec.emit.early-fire.enabled' = true. > The exceptions is below: > {quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime > cannot be cast to java.sql.Timestamp > at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164) > at > org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) > at java.lang.Thread.run(Thread.java:748) > {quote} > I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`: > {code:java} > @Test > def testEarlyFireWithTumblingWindow(): Unit = { > val stream = failingDataSource(data) > .assignTimestampsAndWatermarks( > new TimestampAndWatermarkWithOffset > [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) > val table = stream.toTable(tEnv, > 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name) > tEnv.registerTable("T1", table) > > tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", > true) > > tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", > "1000 ms") > val sql = > """ > |SELECT > | SUM(cnt) as s, > | MAX(ts) > |FROM > | (SELECT > | `string`, > | `int`, > | COUNT(*) AS cnt, > | MAX(rowtime) as ts > | FROM T1 > | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)) > |GROUP BY `string` > |""".stripMargin > tEnv.sqlQuery(sql).toRetractStream[Row].print() > env.execute() > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)