[ https://issues.apache.org/jira/browse/FLINK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064882#comment-17064882 ]
gkgkgk commented on FLINK-15943: -------------------------------- This might be related to [|https://issues.apache.org/jira/browse/FLINK-15801] > Rowtime field name cannot be the same as the json field > -------------------------------------------------------- > > Key: FLINK-15943 > URL: https://issues.apache.org/jira/browse/FLINK-15943 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Planner > Affects Versions: 1.9.0 > Reporter: gkgkgk > Priority: Major > > Run the following sql: > -- sql start > --source > CREATE TABLE dwd_user_log ( > id VARCHAR, > ctime TIMESTAMP, > sessionId VARCHAR, > pageId VARCHAR, > eventId VARCHAR, > deviceId Decimal > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'dev_dwd_user_log_02', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.0.key' = 'zookeeper.connect', > 'connector.properties.0.value' = 'node01:2181', > 'connector.properties.1.key' = 'bootstrap.servers', > 'connector.properties.1.value' = 'node01:9092', > 'connector.properties.2.key' = 'group.id', > 'connector.properties.2.value' = 'dev-group', > 'update-mode' = 'append', > 'format.type' = 'json', > -- 'format.derive-schema' = 'true', > 'format.json-schema' = '{ > "type": "object", > "properties": { > "id": { > "type": "string" > }, > "ctime": { > "type": "string", > "format": "date-time" > }, > "pageId": { > "type": "string" > }, > "eventId": { > "type": "string" > }, > "sessionId": { > "type": "string" > }, > "deviceId": { > "type": "number" > } > } > }', > 'schema.1.rowtime.timestamps.type' = 'from-field', > 'schema.1.rowtime.timestamps.from' = 'ctime', > 'schema.1.rowtime.watermarks.type' = 'periodic-bounded', > 'schema.1.rowtime.watermarks.delay' = '10000' > ); > -- sink > -- sink for pv > CREATE TABLE dws_pv ( > windowStart TIMESTAMP, > windowEnd TIMESTAMP, > pageId VARCHAR, > viewCount BIGINT > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'dev_dws_pvuv_02', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.0.key' = 'zookeeper.connect', > 'connector.properties.0.value' = 'node01:2181', > 'connector.properties.1.key' = 'bootstrap.servers', > 'connector.properties.1.value' = 'node01:9092', > 'connector.properties.2.key' = 'group.id', > 'connector.properties.2.value' = 'dev-group', > 'update-mode' = 'append', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > -- pv > INSERT INTO dws_pv > SELECT > TUMBLE_START(ctime, INTERVAL '20' SECOND) AS windowStart, > TUMBLE_END(ctime, INTERVAL '20' SECOND) AS windowEnd, > pageId, > COUNT(deviceId) AS viewCount > FROM dwd_user_log > GROUP BY TUMBLE(ctime, INTERVAL '20' SECOND),pageId; > -- sql end > And hit the following error: > {code:java} > //Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field 'ctime' could not be resolved by the field mapping.Exception in thread > "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could > not be resolved by the field mapping. at > org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:357) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at > org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:388) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:275) > at > org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:270) > at scala.Option.map(Option.scala:146) at > org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:270) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:117) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)