wuchong commented on a change in pull request #11837:
URL: https://github.com/apache/flink/pull/11837#discussion_r413470860



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
                                }
                        }
                }
+
+               // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+               // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               Optional<TableSource> sourceOpt = findAndCreateTableSource(new 
TableConfig().getConfiguration());
+               if 
(tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+                       && tableSchema.getWatermarkSpecs().isEmpty()
+                       && sourceOpt.isPresent()) {
+                       TableSource source = sourceOpt.get();
+                       if ((source instanceof DefinedProctimeAttribute
+                                       && ((DefinedProctimeAttribute) 
source).getProctimeAttribute() != null)
+                                       ||
+                                       (source instanceof 
DefinedRowtimeAttributes
+                                                       && 
((DefinedRowtimeAttributes) source).getRowtimeAttributeDescriptors() != null
+                                                       && 
!((DefinedRowtimeAttributes) 
source).getRowtimeAttributeDescriptors().isEmpty())) {

Review comment:
       Add a `hasProctimeAttribute` to `TableSourceValidation` and the 
condition can be simplified into 
   
   ```java
   if (hasRowtimeAttribute(source) && hasProctimeAttribute(source))
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
                                }
                        }
                }
+
+               // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+               // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               Optional<TableSource> sourceOpt = findAndCreateTableSource(new 
TableConfig().getConfiguration());

Review comment:
       ```suggestion
                Optional<TableSource<?>> sourceOpt = 
findAndCreateTableSource(new TableConfig().getConfiguration());
   ```
   
   Add `<?>` to TableSource to avoid IDEA warning.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
##########
@@ -130,6 +131,60 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan(sqlQuery)
   }
 
+
+  @Test
+  def testLegacyRowTimeTableGroupWindow(): Unit = {
+    util.tableEnv.connect(new ConnectorDescriptor("TestTableSourceWithTime", 
1, false) {
+      override protected def toConnectorProperties: JMap[String, String] = {
+        Collections.emptyMap()
+      }

Review comment:
       Can we have a dedicated descriptor for `TestTableSourceWithTime`? This 
code looks confusing. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
                                }
                        }
                }
+
+               // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+               // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               Optional<TableSource> sourceOpt = findAndCreateTableSource(new 
TableConfig().getConfiguration());
+               if 
(tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+                       && tableSchema.getWatermarkSpecs().isEmpty()

Review comment:
       Add `isStreamingMode` into this condition, and 
`findAndCreateTableSource`  when the condition is satisfied. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##########
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
                                }
                        }
                }
+
+               // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+               // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               Optional<TableSource> sourceOpt = findAndCreateTableSource(new 
TableConfig().getConfiguration());

Review comment:
       If the `ReadableConfig` is always an empty configuration. Please remove 
the parameter and construct in the `findAndCreateTableSource` method with a 
comment to explain why we use an empty configuration. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -210,20 +208,37 @@ class CatalogSourceTable[T](
    */
   private def eraseTimeIndicator(
       relDataType: RelDataType,
-      factory: FlinkTypeFactory): RelDataType = {
-    val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
-    val fieldNames = logicalRowType.getFieldNames
-    val fieldTypes = logicalRowType.getFields.map { f =>
-      if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
-        val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
-        new TimestampType(
-          timeIndicatorType.isNullable,
-          TimestampKind.REGULAR,
-          timeIndicatorType.getPrecision)
-      } else {
-        f.getType
+      factory: FlinkTypeFactory,
+      tableSource: TableSource[_]): RelDataType = {
+    val isLegacySource = tableSource match {
+      case rts: DefinedRowtimeAttributes
+        if (rts.getRowtimeAttributeDescriptors != null
+          && rts.getRowtimeAttributeDescriptors.nonEmpty) =>
+        true
+      case pts: DefinedProctimeAttribute if pts.getProctimeAttribute != null =>
+         true
+      case _ => false
+    }

Review comment:
       `val legacyTimeAttributeDefined = hasRowtimeAttribute(source) && 
hasProctimeAttribute(source)`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
##########
@@ -20,3 +20,5 @@ 
org.apache.flink.table.planner.utils.TestFilterableTableSourceFactory
 org.apache.flink.table.planner.utils.TestProjectableTableSourceFactory
 org.apache.flink.table.planner.utils.TestCsvFileSystemFormatFactory
 org.apache.flink.table.planner.utils.TestOptionsTableFactory
+

Review comment:
       remove empty line?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
##########
@@ -200,6 +199,65 @@ class TestTableSourceWithTime[T](
   }
 }
 
+class TestTableSourceWithTimeFactory[T] extends StreamTableSourceFactory[T] {
+  override def createStreamTableSource(properties: JMap[String, String]): 
StreamTableSource[T] = {
+    val dp = new DescriptorProperties()
+    dp.putProperties(properties)
+
+    val isBounded = dp.getOptionalBoolean("is-bounded").orElse(false)
+    val tableSchema = dp.getTableSchema(Schema.SCHEMA)
+    val serializedData = dp.getOptionalString("data").orElse(null)
+    val data = if (serializedData != null) {
+      EncodingUtils.decodeStringToObject(serializedData, classOf[List[T]])
+    } else {
+      Seq.empty[T]
+    }
+    val rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(dp)
+    val rowtime = if (rowtimeAttributes.isEmpty) {
+      null
+    } else {
+      rowtimeAttributes.head.getAttributeName
+    }
+    val proctimeAttribute = SchemaValidator.deriveProctimeAttribute(dp)
+    val proctime = if (proctimeAttribute.isPresent) {
+      proctimeAttribute.get()
+    } else {
+      null
+    }
+
+    val serializedMapKeys = dp.getOptionalString("map-keys").orElse(null)
+    val serializedMapVals = dp.getOptionalString("map-vals").orElse(null)
+    val mapping = if (serializedMapKeys != null && serializedMapVals != null) {
+      val mapKeys = EncodingUtils.decodeStringToObject(serializedMapKeys, 
classOf[List[String]])
+      val mapVals = EncodingUtils.decodeStringToObject(serializedMapVals, 
classOf[List[String]])
+      if (mapKeys.length != mapVals.length) {
+        null
+      } else {
+        mapKeys.zip(mapVals).toMap
+      }
+    } else {
+      null
+    }
+
+    val existingTs = dp.getOptionalString("existingTs").orElse(null)
+
+    new TestTableSourceWithTime[T](
+      isBounded, tableSchema, null, data, rowtime, proctime, mapping, 
existingTs)

Review comment:
       If we only use this for planning, do you need to support extracting the 
`data` and `mapping` and `existingTs` ? 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
##########
@@ -200,6 +199,65 @@ class TestTableSourceWithTime[T](
   }
 }
 
+class TestTableSourceWithTimeFactory[T] extends StreamTableSourceFactory[T] {
+  override def createStreamTableSource(properties: JMap[String, String]): 
StreamTableSource[T] = {
+    val dp = new DescriptorProperties()
+    dp.putProperties(properties)
+
+    val isBounded = dp.getOptionalBoolean("is-bounded").orElse(false)
+    val tableSchema = dp.getTableSchema(Schema.SCHEMA)
+    val serializedData = dp.getOptionalString("data").orElse(null)
+    val data = if (serializedData != null) {
+      EncodingUtils.decodeStringToObject(serializedData, classOf[List[T]])
+    } else {
+      Seq.empty[T]
+    }
+    val rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(dp)
+    val rowtime = if (rowtimeAttributes.isEmpty) {
+      null
+    } else {
+      rowtimeAttributes.head.getAttributeName
+    }
+    val proctimeAttribute = SchemaValidator.deriveProctimeAttribute(dp)
+    val proctime = if (proctimeAttribute.isPresent) {
+      proctimeAttribute.get()
+    } else {
+      null
+    }
+
+    val serializedMapKeys = dp.getOptionalString("map-keys").orElse(null)
+    val serializedMapVals = dp.getOptionalString("map-vals").orElse(null)
+    val mapping = if (serializedMapKeys != null && serializedMapVals != null) {
+      val mapKeys = EncodingUtils.decodeStringToObject(serializedMapKeys, 
classOf[List[String]])
+      val mapVals = EncodingUtils.decodeStringToObject(serializedMapVals, 
classOf[List[String]])
+      if (mapKeys.length != mapVals.length) {
+        null
+      } else {
+        mapKeys.zip(mapVals).toMap
+      }
+    } else {
+      null
+    }
+
+    val existingTs = dp.getOptionalString("existingTs").orElse(null)
+
+    new TestTableSourceWithTime[T](
+      isBounded, tableSchema, null, data, rowtime, proctime, mapping, 
existingTs)

Review comment:
       Why the `returnType` is null? Use `tableSchema.toRowType` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to