Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5564#discussion_r171354820
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
    @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
     object SchemaValidator {
     
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_NAME = "name"
    +  val SCHEMA_TYPE = "type"
    +  val SCHEMA_PROCTIME = "proctime"
    +  val SCHEMA_FROM = "from"
    +
    +  // utilities
    +
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val isProctime = toScala(
    +        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
    +        }
    +      }
    +    }
    +    toJava(None)
    +  }
    +
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
    +            extractor,
    +            strategy)
    +        }
    +    }
    +
    +    attributes.asJava
    +  }
    +
    +  /**
    +    * Finds a table source field mapping.
    +    */
    +  def deriveFieldMapping(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : util.Map[String, String] = {
    +
    +    val mapping = mutable.Map[String, String]()
    +
    +    val schema = properties.getTableSchema(SCHEMA)
    +
    +    // add all schema fields first for implicit mappings
    +    schema.getColumnNames.foreach { name =>
    +      mapping.put(name, name)
    +    }
    +
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +
    +    for (i <- 0 until names.size) {
    +      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
    +      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
     
    -  // per column properties
    +        // add explicit mapping
    +        case Some(source) =>
    +          mapping.put(name, source)
     
    -  val NAME = "name"
    -  val TYPE = "type"
    -  val PROCTIME = "proctime"
    -  val PROCTIME_VALUE_TRUE = "true"
    -  val FROM = "from"
    +        // implicit mapping or time
    +        case None =>
    +          val isProctime = properties
    +            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
    +            .orElse(false)
    +          val isRowtime = properties
    +            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +          // remove proctime/rowtime from mapping
    +          if (isProctime || isRowtime) {
    +            mapping.remove(name)
    +          }
    +          // check for invalid fields
    +          else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
    +            throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
    +              s"from source. Please specify the source field from which it 
can be derived.")
    +          }
    +      }
    +    }
     
    +    mapping.toMap.asJava
    +  }
    +
    +  /**
    +    * Finds the fields that can be used for a format schema (without time 
attributes).
    +    */
    +  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
    --- End diff --
    
    Thanks for your explanation @twalthr. I totally agree that we should avoid 
letting the users define schemas multi-times. As the names and definitions are 
still confusing me, I'd share my understanding to see if it's correct. Let's 
take the KafkaJsonTableSource as an example. Briefly, the schema mapping can be 
illustrated with
    
    ```
    json-format-schema(physical, optional) <-- mapping --> 
result-schema(physical)
    result-schema(physical) + timestamp fields(logical) = table-schema(logical, 
required)
    ```
    
    The `JSON-format-schema` could be either defined with a JSON-schema 
string(FORMAT_JSON_SCHEMA) or a `TypeInformation`(FORMAT_SCHEMA). When the 
JSON-format-schema is not provided, we use the `deriveFormatFields()` method to 
generate it from the result-schema and add something like a "self-mapping". 
IMO, if we don't pass the `jsonSchema` to the builder, there's no need to 
define the mapping, right?
    
    The timestamp mechanism may even be complicated for Kafka since it supports 
`withKafkaTimestampAsRowtimeAttribute` (though according to FLINK-8500, this 
method seems to not work for now). I suppose we need a new 
`RowtimeDescriptor`for it? 


---

Reply via email to