danny0405 commented on a change in pull request #11985:
URL: https://github.com/apache/flink/pull/11985#discussion_r421997440



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java
##########
@@ -188,11 +192,46 @@ private static FlinkPreparingTableBase 
convertCatalogTable(
                        RelDataType rowType,
                        CatalogTable catalogTable,
                        CatalogSchemaTable schemaTable) {
-               return new CatalogSourceTable<>(
-                       relOptSchema,
-                       names,
-                       rowType,
-                       schemaTable,
-                       catalogTable);
+               if (isLegacyConnectorOptions(catalogTable, schemaTable)) {
+                       return new LegacyCatalogSourceTable<>(
+                               relOptSchema,
+                               names,
+                               rowType,
+                               schemaTable,
+                               catalogTable);
+               } else {
+                       return new CatalogSourceTable<>(
+                               relOptSchema,
+                               names,
+                               rowType,
+                               schemaTable,
+                               catalogTable);
+               }
+       }
+
+       /**
+        * Checks whether the {@link CatalogTable} uses legacy connector 
options.
+        */
+       private static boolean isLegacyConnectorOptions(
+                       CatalogTable catalogTable,
+                       CatalogSchemaTable schemaTable) {
+               if 
(catalogTable.getOptions().containsKey(ConnectorDescriptorValidator.CONNECTOR_TYPE))
 {
+                       return true;

Review comment:
       I think we should consider the case sensitivity here.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -226,4 +251,113 @@ class CatalogSourceTable[T](
     }
     factory.buildRelNodeRowType(fieldNames, fieldTypes)
   }
+
+  /**
+   * Returns true if there is any generated columns defined on the catalog 
table.
+   */
+  private def containsGeneratedColumns(catalogTable: CatalogTable): Boolean = {
+    catalogTable.getSchema.getTableColumns.exists(_.isGenerated)
+  }
+
+  /**
+   * Creates a new catalog table with the given hint options,
+   * but return the original catalog table if the given hint options is empty.
+   */
+  private def createCatalogTableWithHints(hintedOptions: JMap[String, 
String]): CatalogTable = {
+    if (hintedOptions.nonEmpty) {
+      catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, 
catalogTable.getOptions))
+    } else {
+      catalogTable
+    }
+  }
+
+  /**
+   * Infers whether the current options is using legacy [[TableSource]].
+   */
+  private def isLegacyOptions(
+      hintedOptions: JMap[String, String],
+      conf: ReadableConfig): Boolean = {
+    val newCatalogTable = createCatalogTableWithHints(hintedOptions)
+    if 
(newCatalogTable.getOptions.contains(ConnectorDescriptorValidator.CONNECTOR_TYPE))
 {
+      true
+    } else {
+      // try to create legacy table source using the options,
+      // some legacy factories uses the new 'connector' key
+      try {
+        findAndCreateLegacyTableSource(hintedOptions, conf)
+        // success, then we will use the legacy factories
+        true
+      } catch {
+        case _: Throwable =>
+          // can't create, then we will use new factories
+          false
+      }
+    }
+  }
+
+  private def validateTableSource(tableSource: DynamicTableSource): Unit = {
+    // validation
+    val unsupportedAbilities = List(
+      classOf[SupportsProjectionPushDown],
+      classOf[SupportsFilterPushDown],
+      classOf[SupportsLimitPushDown],
+      classOf[SupportsPartitionPushDown],
+      classOf[SupportsComputedColumnPushDown],
+      classOf[SupportsWatermarkPushDown]
+    )
+    val tableName = schemaTable.getTableIdentifier.asSummaryString
+    tableSource match {
+      case ts: ScanTableSource =>
+        val changelogMode = ts.getChangelogMode
+        if (!schemaTable.isStreamingMode) {
+          // batch only supports bounded source
+          val provider = 
ts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
+          if (!provider.isBounded) {
+            throw new ValidationException("Cannot query on an unbounded source 
in batch mode, " +
+              s"but '$tableName' is unbounded.")
+          }
+          // batch only supports INSERT only source
+          if (!changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, batch mode only supports INSERT only source, but " +
+              s"'$tableName' source produces not INSERT only messages")
+          }
+        } else {
+          // sanity check for produced ChangelogMode
+          val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
+          val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
+          (hasUpdateBefore, hasUpdateAfter) match {
+            case (true, true) =>
+              // UPDATE_BEFORE and UPDATE_AFTER, pass
+            case (false, true) =>
+              // only UPDATE_AFTER
+              throw new UnsupportedOperationException(
+                "Currently, ScanTableSource doesn't support producing 
ChangelogMode " +
+                  "which contains UPDATE_AFTER but no UPDATE_BEFORE. Please 
adapt the " +
+                  s"implementation of '${ts.asSummaryString()}' source.")
+            case (true, false) =>
+               // only UPDATE_BEFORE
+              throw new ValidationException(
+                s"'$tableName' source produces ChangelogMode which " +
+                  s"contains UPDATE_BEFORE but doesn't contain UPDATE_AFTER, 
this is invalid.")
+            case _ =>
+              // no updates, pass
+          }
+
+          // watermark defined on a changelog source is not supported
+          if (!catalogTable.getSchema.getWatermarkSpecs.isEmpty &&
+              !changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, defining WATERMARK on a changelog source is not 
supported.")
+          }
+        }
+      case _ =>
+        unsupportedAbilities.foreach { ability =>
+          if (tableSource.getClass.isAssignableFrom(ability)) {

Review comment:
       Did i miss something, the `ScanTableSource` code would never go here. So 
the check never triggers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to