Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5662#discussion_r173455747 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -198,14 +205,20 @@ object SchemaValidator { val isProctime = properties .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") .orElse(false) - val isRowtime = properties - .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" + val isRowtime = properties.containsKey(tsType) if (!isProctime && !isRowtime) { // check for a aliasing val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") .orElse(n) builder.field(fieldName, t) } + // only use the rowtime attribute if it references a field + else if (isRowtime && + properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) { --- End diff -- What if the user uses the custom extractor to define his/her own `ExistingField` extractor that references a field?
---