
ASF GitHub Bot commented on FLINK-8538:

Github user twalthr commented on a diff in the pull request:

    --- Diff: 
    @@ -67,14 +85,129 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
     object SchemaValidator {
       val SCHEMA = "schema"
    -  val SCHEMA_VERSION = "schema.version"
    +  val SCHEMA_NAME = "name"
    +  val SCHEMA_TYPE = "type"
    +  val SCHEMA_PROCTIME = "proctime"
    +  val SCHEMA_FROM = "from"
    +  // utilities
    +  /**
    +    * Finds the proctime attribute if defined.
    +    */
    +  def deriveProctimeAttribute(properties: DescriptorProperties): 
Optional[String] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +    for (i <- 0 until names.size) {
    +      val isProctime = toScala(
    +        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
    +      isProctime.foreach { isSet =>
    +        if (isSet) {
    +          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
    +        }
    +      }
    +    }
    +    toJava(None)
    +  }
    +  /**
    +    * Finds the rowtime attributes if defined.
    +    */
    +  def deriveRowtimeAttributes(properties: DescriptorProperties)
    +    : util.List[RowtimeAttributeDescriptor] = {
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
    +    // check for rowtime in every field
    +    for (i <- 0 until names.size) {
    +      RowtimeValidator
    +        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
    +        .foreach { case (extractor, strategy) =>
    +          // create descriptor
    +          attributes += new RowtimeAttributeDescriptor(
    +            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
    +            extractor,
    +            strategy)
    +        }
    +    }
    +    attributes.asJava
    +  }
    +  /**
    +    * Finds a table source field mapping.
    +    */
    +  def deriveFieldMapping(
    +      properties: DescriptorProperties,
    +      sourceSchema: Optional[TableSchema])
    +    : util.Map[String, String] = {
    +    val mapping = mutable.Map[String, String]()
    +    val schema = properties.getTableSchema(SCHEMA)
    +    // add all schema fields first for implicit mappings
    +    schema.getColumnNames.foreach { name =>
    +      mapping.put(name, name)
    +    }
    +    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
    +    for (i <- 0 until names.size) {
    +      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
    +      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) 
match {
    -  // per column properties
    +        // add explicit mapping
    +        case Some(source) =>
    +          mapping.put(name, source)
    -  val NAME = "name"
    -  val TYPE = "type"
    -  val PROCTIME = "proctime"
    -  val PROCTIME_VALUE_TRUE = "true"
    -  val FROM = "from"
    +        // implicit mapping or time
    +        case None =>
    +          val isProctime = properties
    +            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
    +            .orElse(false)
    +          val isRowtime = properties
    +            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +          // remove proctime/rowtime from mapping
    +          if (isProctime || isRowtime) {
    +            mapping.remove(name)
    +          }
    +          // check for invalid fields
    +          else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
    +            throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
    +              s"from source. Please specify the source field from which it 
can be derived.")
    +          }
    +      }
    +    }
    +    mapping.toMap.asJava
    +  }
    +  /**
    +    * Finds the fields that can be used for a format schema (without time 
    +    */
    +  def deriveFormatFields(properties: DescriptorProperties): TableSchema = {
    --- End diff --
    No problem @xccui. My goal was to allow users to specify all fields only 
once. Because users often have tables with 30+ columns. When I opened the PR I 
added a possibility to derive a `schema` from a `format` schema. But according 
to a SQL DDL statement `CREATE TABLE (..) [FORMAT] ...` the `schema` must be 
always complete and the `format` schema might be derived, so I changed my 
initial implementation.
    For simplicity `deriveFormatFields` removes the time attributes and takes 
the result schema as the format's schema, because `rowtime` must not be an 
existing field. If rowtime should be an existing field, the full format schema 
is mandatory (because `schema` and `format` schema might differ). I agree that 
we need good documentation for all of that.

> Add a Kafka table source factory with JSON format support
> ---------------------------------------------------------
>                 Key: FLINK-8538
>                 URL: https://issues.apache.org/jira/browse/FLINK-8538
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Xingcan Cui
>            Priority: Major
>             Fix For: 1.5.0, 1.6.0
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.

This message was sent by Atlassian JIRA

Reply via email to