wuchong commented on a change in pull request #11985: URL: https://github.com/apache/flink/pull/11985#discussion_r421914970
########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala ########## @@ -226,4 +251,113 @@ class CatalogSourceTable[T]( } factory.buildRelNodeRowType(fieldNames, fieldTypes) } + + /** + * Returns true if there is any generated columns defined on the catalog table. + */ + private def containsGeneratedColumns(catalogTable: CatalogTable): Boolean = { + catalogTable.getSchema.getTableColumns.exists(_.isGenerated) + } + + /** + * Creates a new catalog table with the given hint options, + * but return the original catalog table if the given hint options is empty. + */ + private def createCatalogTableWithHints(hintedOptions: JMap[String, String]): CatalogTable = { + if (hintedOptions.nonEmpty) { + catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, catalogTable.getOptions)) + } else { + catalogTable + } + } + + /** + * Infers whether the current options is using legacy [[TableSource]]. + */ + private def isLegacyOptions( + hintedOptions: JMap[String, String], + conf: ReadableConfig): Boolean = { + val newCatalogTable = createCatalogTableWithHints(hintedOptions) + if (newCatalogTable.getOptions.contains(ConnectorDescriptorValidator.CONNECTOR_TYPE)) { + true + } else { + // try to create legacy table source using the options, + // some legacy factories uses the new 'connector' key + try { + findAndCreateLegacyTableSource(hintedOptions, conf) + // success, then we will use the legacy factories + true + } catch { + case _: Throwable => Review comment: I though about this. However, it doesn't work. Because, the `connector.property-version` is attached by the framework. So no matter the the connector is old or new, they have the same `connector.property-version`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org