Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r171354820 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal object SchemaValidator { val SCHEMA = "schema" - val SCHEMA_VERSION = "schema.version" + val SCHEMA_NAME = "name" + val SCHEMA_TYPE = "type" + val SCHEMA_PROCTIME = "proctime" + val SCHEMA_FROM = "from" + + // utilities + + /** + * Finds the proctime attribute if defined. + */ + def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = { + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + for (i <- 0 until names.size) { + val isProctime = toScala( + properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")) + isProctime.foreach { isSet => + if (isSet) { + return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME")) + } + } + } + toJava(None) + } + + /** + * Finds the rowtime attributes if defined. + */ + def deriveRowtimeAttributes(properties: DescriptorProperties) + : util.List[RowtimeAttributeDescriptor] = { + + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]() + + // check for rowtime in every field + for (i <- 0 until names.size) { + RowtimeValidator + .getRowtimeComponents(properties, s"$SCHEMA.$i.") + .foreach { case (extractor, strategy) => + // create descriptor + attributes += new RowtimeAttributeDescriptor( + properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"), + extractor, + strategy) + } + } + + attributes.asJava + } + + /** + * Finds a table source field mapping. + */ + def deriveFieldMapping( + properties: DescriptorProperties, + sourceSchema: Optional[TableSchema]) + : util.Map[String, String] = { + + val mapping = mutable.Map[String, String]() + + val schema = properties.getTableSchema(SCHEMA) + + // add all schema fields first for implicit mappings + schema.getColumnNames.foreach { name => + mapping.put(name, name) + } + + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + for (i <- 0 until names.size) { + val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME") + toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match { - // per column properties + // add explicit mapping + case Some(source) => + mapping.put(name, source) - val NAME = "name" - val TYPE = "type" - val PROCTIME = "proctime" - val PROCTIME_VALUE_TRUE = "true" - val FROM = "from" + // implicit mapping or time + case None => + val isProctime = properties + .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") + .orElse(false) + val isRowtime = properties + .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + // remove proctime/rowtime from mapping + if (isProctime || isRowtime) { + mapping.remove(name) + } + // check for invalid fields + else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) { + throw new ValidationException(s"Could not map the schema field '$name' to a field " + + s"from source. Please specify the source field from which it can be derived.") + } + } + } + mapping.toMap.asJava + } + + /** + * Finds the fields that can be used for a format schema (without time attributes). + */ + def deriveFormatFields(properties: DescriptorProperties): TableSchema = { --- End diff -- Thanks for your explanation @twalthr. I totally agree that we should avoid letting the users define schemas multi-times. As the names and definitions are still confusing me, I'd share my understanding to see if it's correct. Let's take the KafkaJsonTableSource as an example. Briefly, the schema mapping can be illustrated with ``` json-format-schema(physical, optional) <-- mapping --> result-schema(physical) result-schema(physical) + timestamp fields(logical) = table-schema(logical, required) ``` The `JSON-format-schema` could be either defined with a JSON-schema string(FORMAT_JSON_SCHEMA) or a `TypeInformation`(FORMAT_SCHEMA). When the JSON-format-schema is not provided, we use the `deriveFormatFields()` method to generate it from the result-schema and add something like a "self-mapping". IMO, if we don't pass the `jsonSchema` to the builder, there's no need to define the mapping, right? The timestamp mechanism may even be complicated for Kafka since it supports `withKafkaTimestampAsRowtimeAttribute` (though according to FLINK-8500, this method seems to not work for now). I suppose we need a new `RowtimeDescriptor`for it?
---