[ https://issues.apache.org/jira/browse/FLINK-17600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuval Itzchakov updated FLINK-17600: ------------------------------------ Description: Given the following SQL statement: {code:java} tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code} Where FOO is a table originating from a custom source and a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`. This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one). *This* *does not reproduce in the old Flink planner*, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes. Relevant code: *CatalogSchemaTable*: {code:java} private static RelDataType getRowType(RelDataTypeFactory typeFactory, CatalogBaseTable catalogBaseTable, boolean isStreamingMode) { final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; TableSchema tableSchema = catalogBaseTable.getSchema(); final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); if (!isStreamingMode && catalogBaseTable instanceof ConnectorCatalogTable && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) { // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type. // Now for ConnectorCatalogTable, there is no way to // deduce if it is bounded in the table environment, so the data types in TableSchema // always patched with TimeAttribute. // See ConnectorCatalogTable#calculateSourceSchema // for details. // Remove the patched time attributes type to let the TableSourceTable handle it. // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed. // TODO: Fix FLINK-14844. for (int i = 0; i < fieldDataTypes.length; i++) { LogicalType lt = fieldDataTypes[i].getLogicalType(); if (lt instanceof TimestampType && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) { int precision = ((TimestampType) lt).getPrecision(); fieldDataTypes[i] = DataTypes.TIMESTAMP(precision); } } } return TableSourceUtil.getSourceRowType(flinkTypeFactory, tableSchema, scala.Option.empty(), isStreamingMode); } {code} *TableSourceUtil:* {code:java} def getSourceRowType( typeFactory: FlinkTypeFactory, tableSchema: TableSchema, tableSource: Option[TableSource[_]], streaming: Boolean): RelDataType = { val fieldNames = tableSchema.getFieldNames val fieldDataTypes = tableSchema.getFieldDataTypes if (tableSchema.getWatermarkSpecs.nonEmpty) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head, streaming) } else if (tableSource.isDefined) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get, streaming) } else { val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType) typeFactory.buildRelNodeRowType(fieldNames, fieldTypes) } }{code} *TableSourceTable:* {code:java} // We must enrich logical schema from catalog table with physical type coming from table source. // Schema coming from catalog table might not have proper conversion classes. Those must be // extracted from produced type, before converting to RelDataType def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String, String] = tableSource match { case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => new JFunction[String, String] { override def apply(t: String): String = mapping.getFieldMapping.get(t) } case _ => JFunction.identity() } val producedDataType = tableSource.getProducedDataType val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( tableSource, tableSchema.getTableColumns, isStreamingMode, nameMapping ) val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) { val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType) fieldIndexes.map(mapIndex(_, idx => TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get())) ) } else { fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType))) } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos) } {code} was: {code:java} // code placeholder {code} Given the following SQL statement: {code:java} tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code} and a custom StreamTableSource[Row] which implements `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner fails to mark the selected field with a `RowtimeAttribute`. This happens because `TableSourceUtil.getSourceRowType`s implementation receives a `None` TableSource from `CatalogSchemaTable.getRowType`, presumably because the Catalog has yet to create the underlying TableSource which is deferred to implementing TableFactory (in this case my own custom one). *This* *does not reproduce in the old Flink planner*, because the old planner uses `TableSourceTable` which explicitly holds a reference to the underlying `TableSource` and extracts it's row time attributes. Relevant code: *CatalogSchemaTable*: {code:java} private static RelDataType getRowType(RelDataTypeFactory typeFactory, CatalogBaseTable catalogBaseTable, boolean isStreamingMode) { final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; TableSchema tableSchema = catalogBaseTable.getSchema(); final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); if (!isStreamingMode && catalogBaseTable instanceof ConnectorCatalogTable && ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) { // If the table source is bounded, materialize the time attributes to normal TIMESTAMP type. // Now for ConnectorCatalogTable, there is no way to // deduce if it is bounded in the table environment, so the data types in TableSchema // always patched with TimeAttribute. // See ConnectorCatalogTable#calculateSourceSchema // for details. // Remove the patched time attributes type to let the TableSourceTable handle it. // We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed. // TODO: Fix FLINK-14844. for (int i = 0; i < fieldDataTypes.length; i++) { LogicalType lt = fieldDataTypes[i].getLogicalType(); if (lt instanceof TimestampType && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) { int precision = ((TimestampType) lt).getPrecision(); fieldDataTypes[i] = DataTypes.TIMESTAMP(precision); } } } return TableSourceUtil.getSourceRowType(flinkTypeFactory, tableSchema, scala.Option.empty(), isStreamingMode); } {code} *TableSourceUtil:* {code:java} def getSourceRowType( typeFactory: FlinkTypeFactory, tableSchema: TableSchema, tableSource: Option[TableSource[_]], streaming: Boolean): RelDataType = { val fieldNames = tableSchema.getFieldNames val fieldDataTypes = tableSchema.getFieldDataTypes if (tableSchema.getWatermarkSpecs.nonEmpty) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSchema.getWatermarkSpecs.head, streaming) } else if (tableSource.isDefined) { getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get, streaming) } else { val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType) typeFactory.buildRelNodeRowType(fieldNames, fieldTypes) } }{code} *TableSourceTable:* {code:java} // We must enrich logical schema from catalog table with physical type coming from table source. // Schema coming from catalog table might not have proper conversion classes. Those must be // extracted from produced type, before converting to RelDataType def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String, String] = tableSource match { case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => new JFunction[String, String] { override def apply(t: String): String = mapping.getFieldMapping.get(t) } case _ => JFunction.identity() } val producedDataType = tableSource.getProducedDataType val fieldIndexes = TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( tableSource, tableSchema.getTableColumns, isStreamingMode, nameMapping ) val typeInfos = if (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) { val physicalSchema = DataTypeUtils.expandCompositeTypeToSchema(producedDataType) fieldIndexes.map(mapIndex(_, idx => TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get())) ) } else { fieldIndexes.map(mapIndex(_, _ => TypeConversions.fromDataTypeToLegacyInfo(producedDataType))) } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos) } {code} > Blink Planner fails to generate RowtimeAttribute based on TableSource's > DefinedRowtimeAttributes implementation > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-17600 > URL: https://issues.apache.org/jira/browse/FLINK-17600 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.10.0 > Reporter: Yuval Itzchakov > Priority: Major > > Given the following SQL statement: > {code:java} > tableEnv.sqlQuery("SELECT EVENT_TIME, B, C FROM FOO"){code} > Where FOO is a table originating from a custom source and a custom > StreamTableSource[Row] which implements > `DefinedRowtimeAttributes.getRowtimeAttributeDescriptors`, Blink Planner > fails to mark the selected field with a `RowtimeAttribute`. > This happens because `TableSourceUtil.getSourceRowType`s implementation > receives a `None` TableSource from `CatalogSchemaTable.getRowType`, > presumably because the Catalog has yet to create the underlying TableSource > which is deferred to implementing TableFactory (in this case my own custom > one). > *This* *does not reproduce in the old Flink planner*, because the old planner > uses `TableSourceTable` which explicitly holds a reference to the underlying > `TableSource` and extracts it's row time attributes. > Relevant code: > *CatalogSchemaTable*: > > {code:java} > private static RelDataType getRowType(RelDataTypeFactory typeFactory, > CatalogBaseTable catalogBaseTable, > boolean isStreamingMode) { > final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory; > TableSchema tableSchema = catalogBaseTable.getSchema(); > final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes(); > if (!isStreamingMode > && catalogBaseTable instanceof ConnectorCatalogTable > && ((ConnectorCatalogTable) > catalogBaseTable).getTableSource().isPresent()) { > // If the table source is bounded, materialize the time attributes to > normal TIMESTAMP type. > // Now for ConnectorCatalogTable, there is no way to > // deduce if it is bounded in the table environment, so the data types > in TableSchema > // always patched with TimeAttribute. > // See ConnectorCatalogTable#calculateSourceSchema > // for details. > // Remove the patched time attributes type to let the TableSourceTable > handle it. > // We should remove this logic if the isBatch flag in > ConnectorCatalogTable is fixed. > // TODO: Fix FLINK-14844. > for (int i = 0; i < fieldDataTypes.length; i++) { > LogicalType lt = fieldDataTypes[i].getLogicalType(); > if (lt instanceof TimestampType > && (((TimestampType) lt).getKind() == TimestampKind.PROCTIME > || ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) { > int precision = ((TimestampType) lt).getPrecision(); > fieldDataTypes[i] = DataTypes.TIMESTAMP(precision); > } > } > } > return TableSourceUtil.getSourceRowType(flinkTypeFactory, > tableSchema, > scala.Option.empty(), > isStreamingMode); > } > {code} > *TableSourceUtil:* > > > {code:java} > def getSourceRowType( > typeFactory: FlinkTypeFactory, > tableSchema: TableSchema, > tableSource: Option[TableSource[_]], > streaming: Boolean): RelDataType = { > val fieldNames = tableSchema.getFieldNames > val fieldDataTypes = tableSchema.getFieldDataTypes > if (tableSchema.getWatermarkSpecs.nonEmpty) { > getSourceRowType(typeFactory, fieldNames, fieldDataTypes, > tableSchema.getWatermarkSpecs.head, > streaming) > } else if (tableSource.isDefined) { > getSourceRowType(typeFactory, fieldNames, fieldDataTypes, tableSource.get, > streaming) > } else { > val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType) > typeFactory.buildRelNodeRowType(fieldNames, fieldTypes) > } > }{code} > *TableSourceTable:* > {code:java} > // We must enrich logical schema from catalog table with physical type > coming from table source. > // Schema coming from catalog table might not have proper conversion > classes. Those must be > // extracted from produced type, before converting to RelDataType > def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val > flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val > fieldNames = tableSchema.getFieldNames val nameMapping: JFunction[String, > String] = tableSource match { > case mapping: DefinedFieldMapping if mapping.getFieldMapping != null => > new JFunction[String, String] { > override def apply(t: String): String = > mapping.getFieldMapping.get(t) > } > case _ => JFunction.identity() > } val producedDataType = tableSource.getProducedDataType > val fieldIndexes = > TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( > tableSource, > tableSchema.getTableColumns, > isStreamingMode, > nameMapping > ) val typeInfos = if > (LogicalTypeChecks.isCompositeType(producedDataType.getLogicalType)) { > val physicalSchema = > DataTypeUtils.expandCompositeTypeToSchema(producedDataType) > fieldIndexes.map(mapIndex(_, > idx => > > TypeConversions.fromDataTypeToLegacyInfo(physicalSchema.getFieldDataType(idx).get())) > ) > } else { > fieldIndexes.map(mapIndex(_, _ => > TypeConversions.fromDataTypeToLegacyInfo(producedDataType))) > } flinkTypeFactory.buildLogicalRowType(fieldNames, typeInfos) > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)