[ https://issues.apache.org/jira/browse/FLINK-8854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392913#comment-16392913 ]
ASF GitHub Bot commented on FLINK-8854: --------------------------------------- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173455747 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -198,14 +205,20 @@ object SchemaValidator { val isProctime = properties .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") .orElse(false) - val isRowtime = properties - .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" + val isRowtime = properties.containsKey(tsType) if (!isProctime && !isRowtime) { // check for a aliasing val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") .orElse(n) builder.field(fieldName, t) } + // only use the rowtime attribute if it references a field + else if (isRowtime && + properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) { --- End diff -- What if the user uses the custom extractor to define his/her own `ExistingField` extractor that references a field? > Mapping of SchemaValidator.deriveFieldMapping() is incorrect. > ------------------------------------------------------------- > > Key: FLINK-8854 > URL: https://issues.apache.org/jira/browse/FLINK-8854 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.5.0 > Reporter: Fabian Hueske > Assignee: Timo Walther > Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > The field mapping returned by {{SchemaValidator.deriveFieldMapping()}} is not > correct. > It should not only include all fields of the table schema, but also all > fields of the format schema (mapped to themselves). Otherwise, it is not > possible to use a timestamp extractor on a field that is not in table schema. > For example this configuration would fail: > {code} > sources: > - name: TaxiRides > schema: > - name: rideId > type: LONG > - name: rowTime > type: TIMESTAMP > rowtime: > timestamps: > type: "from-field" > from: "rideTime" > watermarks: > type: "periodic-bounded" > delay: "60000" > connector: > .... > format: > property-version: 1 > type: json > schema: "ROW(rideId LONG, rideTime TIMESTAMP)" > {code} > because {{rideTime}} is not in the table schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)