Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r171292456 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal object SchemaValidator { val SCHEMA = "schema" - val SCHEMA_VERSION = "schema.version" + val SCHEMA_NAME = "name" + val SCHEMA_TYPE = "type" + val SCHEMA_PROCTIME = "proctime" + val SCHEMA_FROM = "from" + + // utilities + + /** + * Finds the proctime attribute if defined. + */ + def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = { + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + for (i <- 0 until names.size) { + val isProctime = toScala( + properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")) + isProctime.foreach { isSet => + if (isSet) { + return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME")) + } + } + } + toJava(None) + } + + /** + * Finds the rowtime attributes if defined. + */ + def deriveRowtimeAttributes(properties: DescriptorProperties) + : util.List[RowtimeAttributeDescriptor] = { + + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]() + + // check for rowtime in every field + for (i <- 0 until names.size) { + RowtimeValidator + .getRowtimeComponents(properties, s"$SCHEMA.$i.") + .foreach { case (extractor, strategy) => + // create descriptor + attributes += new RowtimeAttributeDescriptor( + properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"), + extractor, + strategy) + } + } + + attributes.asJava + } + + /** + * Finds a table source field mapping. + */ + def deriveFieldMapping( + properties: DescriptorProperties, + sourceSchema: Optional[TableSchema]) + : util.Map[String, String] = { + + val mapping = mutable.Map[String, String]() + + val schema = properties.getTableSchema(SCHEMA) + + // add all schema fields first for implicit mappings + schema.getColumnNames.foreach { name => + mapping.put(name, name) + } + + val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) + + for (i <- 0 until names.size) { + val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME") + toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match { - // per column properties + // add explicit mapping + case Some(source) => + mapping.put(name, source) - val NAME = "name" - val TYPE = "type" - val PROCTIME = "proctime" - val PROCTIME_VALUE_TRUE = "true" - val FROM = "from" + // implicit mapping or time + case None => + val isProctime = properties + .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") + .orElse(false) + val isRowtime = properties + .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") + // remove proctime/rowtime from mapping + if (isProctime || isRowtime) { + mapping.remove(name) + } + // check for invalid fields + else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) { + throw new ValidationException(s"Could not map the schema field '$name' to a field " + + s"from source. Please specify the source field from which it can be derived.") + } + } + } + mapping.toMap.asJava + } + + /** + * Finds the fields that can be used for a format schema (without time attributes). + */ + def deriveFormatFields(properties: DescriptorProperties): TableSchema = { --- End diff -- No problem @xccui. My goal was to allow users to specify all fields only once. Because users often have tables with 30+ columns. When I opened the PR I added a possibility to derive a `schema` from a `format` schema. But according to a SQL DDL statement `CREATE TABLE (..) [FORMAT] ...` the `schema` must be always complete and the `format` schema might be derived, so I changed my initial implementation. For simplicity `deriveFormatFields` removes the time attributes and takes the result schema as the format's schema, because `rowtime` must not be an existing field. If rowtime should be an existing field, the full format schema is mandatory (because `schema` and `format` schema might differ). I agree that we need good documentation for all of that.
---