[ https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374092#comment-16374092 ]
ASF GitHub Bot commented on FLINK-8538: --------------------------------------- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5564#discussion_r170188551 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala --- @@ -18,37 +18,85 @@ package org.apache.flink.table.descriptors +import org.apache.flink.util.Preconditions import org.junit.Assert.assertEquals +import org.junit.Test + +import scala.collection.JavaConverters._ abstract class DescriptorTestBase { /** - * Returns a valid descriptor. + * Returns a set of valid descriptors. + * This method is implemented in both Scala and Java. + */ + def descriptors(): java.util.List[Descriptor] + + /** + * Returns a set of properties for each valid descriptor. + * This code is implemented in both Scala and Java. */ - def descriptor(): Descriptor + def properties(): java.util.List[java.util.Map[String, String]] /** - * Returns a validator that can validate this descriptor. + * Returns a validator that can validate all valid descriptors. */ def validator(): DescriptorValidator - def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = { + @Test + def testValidation(): Unit = { + val d = descriptors().asScala + val p = properties().asScala + + Preconditions.checkArgument(d.length == p.length) + + d.zip(p).foreach { case (desc, props) => + verifyProperties(desc, props.asScala.toMap) + } + } + + def verifyProperties(descriptor: Descriptor, expected: Map[String, String]): Unit = { val normProps = new DescriptorProperties descriptor.addProperties(normProps) - assertEquals(expected.toMap, normProps.asMap) + assertEquals(expected, normProps.asScalaMap) } - def verifyInvalidProperty(property: String, invalidValue: String): Unit = { + def verifyInvalidProperty( + descriptor: Descriptor, + property: String, + invalidValue: String): Unit = { val properties = new DescriptorProperties - descriptor().addProperties(properties) + descriptor.addProperties(properties) properties.unsafePut(property, invalidValue) validator().validate(properties) } - def verifyMissingProperty(removeProperty: String): Unit = { + def verifyMissingProperty(descriptor: Descriptor, removeProperty: String): Unit = { val properties = new DescriptorProperties - descriptor().addProperties(properties) + descriptor.addProperties(properties) properties.unsafeRemove(removeProperty) validator().validate(properties) } } + +class TestTableSourceDescriptor(connector: ConnectorDescriptor) --- End diff -- ` this.connectorDescriptor = Some(connector)` > Add a Kafka table source factory with JSON format support > --------------------------------------------------------- > > Key: FLINK-8538 > URL: https://issues.apache.org/jira/browse/FLINK-8538 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Xingcan Cui > Priority: Major > > Similar to CSVTableSourceFactory a Kafka table source factory for JSON should > be added. This issue includes improving the existing JSON descriptor with > validation that can be used for other connectors as well. It is up for > discussion if we want to split the KafkaJsonTableSource into connector and > format such that we can reuse the format for other table sources as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)