xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let catalog support partition. URL: https://github.com/apache/flink/pull/6906#discussion_r227915200
########## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ########## @@ -282,34 +296,63 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc this } + /** + * Specifies the partition columns for this external table. + */ + def withPartitionColumnNames( + partitionColumnNames: java.util.LinkedHashSet[String]): ExternalCatalogTableBuilder = { + require(partitionColumnNames != null && !partitionColumnNames.isEmpty) + this.partitionColumnNames = Some(partitionColumnNames) + this + } + /** * Declares this external table as a table source and returns the * configured [[ExternalCatalogTable]]. * * @return External catalog table */ - def asTableSource(): ExternalCatalogTable = { - new ExternalCatalogTable( - isBatch, - isStreaming, - isSource = true, - isSink = false, - DescriptorProperties.toJavaMap(this)) - } + def asTableSource(): ExternalCatalogTable = this.partitionColumnNames match { + case Some(pc) => + new ExternalCatalogPartitionedTable( + isBatch, + isStreaming, + isSource = true, + isSink = false, + pc, + DescriptorProperties.toJavaMap(this) + ) + case None => + new ExternalCatalogTable( + isBatch, + isStreaming, + isSource = true, + isSink = false, + DescriptorProperties.toJavaMap(this)) + } /** * Declares this external table as a table sink and returns the * configured [[ExternalCatalogTable]]. * * @return External catalog table */ - def asTableSink(): ExternalCatalogTable = { - new ExternalCatalogTable( - isBatch, - isStreaming, - isSource = false, - isSink = true, - DescriptorProperties.toJavaMap(this)) + def asTableSink(): ExternalCatalogTable = this.partitionColumnNames match { Review comment: I see a repeated pattern in the three asXXX methods. While it's not introduced in this PR, it might be good if we can introduce a help method that those asXXX methods call to minimize the repetition. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services