xuefuz commented on a change in pull request #6906: [Flink-6036][table]Let 
catalog support partition.
URL: https://github.com/apache/flink/pull/6906#discussion_r227915200
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 ##########
 @@ -282,34 +296,63 @@ class ExternalCatalogTableBuilder(private val 
connectorDescriptor: ConnectorDesc
     this
   }
 
+  /**
+    * Specifies the partition columns for this external table.
+    */
+  def withPartitionColumnNames(
+    partitionColumnNames: java.util.LinkedHashSet[String]): 
ExternalCatalogTableBuilder = {
+    require(partitionColumnNames != null && !partitionColumnNames.isEmpty)
+    this.partitionColumnNames = Some(partitionColumnNames)
+    this
+  }
+
   /**
     * Declares this external table as a table source and returns the
     * configured [[ExternalCatalogTable]].
     *
     * @return External catalog table
     */
-  def asTableSource(): ExternalCatalogTable = {
-    new ExternalCatalogTable(
-      isBatch,
-      isStreaming,
-      isSource = true,
-      isSink = false,
-      DescriptorProperties.toJavaMap(this))
-  }
+  def asTableSource(): ExternalCatalogTable = this.partitionColumnNames match {
+      case Some(pc) =>
+        new ExternalCatalogPartitionedTable(
+          isBatch,
+          isStreaming,
+          isSource = true,
+          isSink = false,
+          pc,
+          DescriptorProperties.toJavaMap(this)
+        )
+      case None =>
+        new ExternalCatalogTable(
+          isBatch,
+          isStreaming,
+          isSource = true,
+          isSink = false,
+          DescriptorProperties.toJavaMap(this))
+   }
 
   /**
     * Declares this external table as a table sink and returns the
     * configured [[ExternalCatalogTable]].
     *
     * @return External catalog table
     */
-  def asTableSink(): ExternalCatalogTable = {
-    new ExternalCatalogTable(
-      isBatch,
-      isStreaming,
-      isSource = false,
-      isSink = true,
-      DescriptorProperties.toJavaMap(this))
+  def asTableSink(): ExternalCatalogTable = this.partitionColumnNames match {
 
 Review comment:
   I see a repeated pattern in the three asXXX methods. While it's not 
introduced in this PR, it might be good if we can introduce a help method that 
those asXXX methods call to minimize the repetition.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to