[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374587#comment-16374587 ]
ASF GitHub Bot commented on FLINK-8538: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170272030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala --- @@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorVal object SchemaValidator { val SCHEMA = "schema" - val SCHEMA_VERSION = "schema.version" + val SCHEMA_PROPERTY_VERSION = "schema.property-version" + val SCHEMA_FIELDS = "schema.fields" + val SCHEMA_FIELDS_NAME = "name" + val SCHEMA_FIELDS_TYPE = "type" + val SCHEMA_FIELDS_PROCTIME = "proctime" + val SCHEMA_FIELDS_FROM = "from" + val SCHEMA_DERIVE_FIELDS = "schema.derive-fields" + val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically" + val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially" + + // utilities + + /** + * Derives a schema from properties and source. + */ + def deriveSchema( + properties: DescriptorProperties, + sourceSchema: Option[TableSchema]) + : TableSchema = { + + val builder = TableSchema.builder() + + val schema = properties.getTableSchema(SCHEMA_FIELDS) + + val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS) + + val sourceNamesAndTypes = derivationMode match { + case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if sourceSchema.isDefined => + // sort by name + sourceSchema.get.getColumnNames + .zip(sourceSchema.get.getTypes) + .sortBy(_._1) + + case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if sourceSchema.isDefined => + sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes) + + case Some(_) => + throw new ValidationException("Derivation of fields is not supported from this source.") + + case None => + Array[(String, TypeInformation[_])]() + } + + // add source fields + sourceNamesAndTypes.foreach { case (n, t) => + builder.field(n, t) + } + + // add schema fields + schema.foreach { ts => + val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes) + schemaNamesAndTypes.foreach { case (n, t) => + // do not allow overwriting + if (sourceNamesAndTypes.exists(_._1 == n)) { + throw new ValidationException( + "Specified schema fields must not overwrite fields derived from the source.") + } + builder.field(n, t) + } + } + + builder.build() + } + + /** + * Derives a schema from properties and source. + * This method is intended for Java code. + */ + def deriveSchema( + properties: DescriptorProperties, + sourceSchema: Optional[TableSchema]) + : TableSchema = { + deriveSchema( + properties, + Option(sourceSchema.orElse(null))) + } + + /** + * Finds the proctime attribute if defined. + */ + def deriveProctimeAttribute(properties: DescriptorProperties): Option[String] = { + val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME) + + for (i <- 0 until names.size) { + val isProctime = properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME") + isProctime.foreach { isSet => + if (isSet) { + return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME") + } + } + } + None + } + + /** + * Finds the proctime attribute if defined. + * This method is intended for Java code. + */ + def deriveProctimeOptional(properties: DescriptorProperties): Optional[String] = { + Optional.ofNullable(deriveProctimeAttribute(properties).orNull) + } + + /** + * Finds the rowtime attributes if defined. + */ + def deriveRowtimeAttributes(properties: DescriptorProperties) + : util.List[RowtimeAttributeDescriptor] = { + + val names = properties.getIndexedProperty(SCHEMA_FIELDS, SCHEMA_FIELDS_NAME) + + var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]() + + // check for rowtime in every field + for (i <- 0 until names.size) { + RowtimeValidator + .getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.") + .foreach { case (extractor, strategy) => + // create descriptor + attributes += new RowtimeAttributeDescriptor( + properties.getString(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME").get, + extractor, + strategy) + } + } + + attributes.asJava + } + + /** + * Find a table source field mapping. + * This method is intended for Java code. + */ + def deriveFieldMapping( --- End diff -- Provide only Java-friendly methods? > Add a Kafka table source factory with JSON format support > --------------------------------------------------------- > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Xingcan Cui > Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)