godfreyhe commented on a change in pull request #12199: URL: https://github.com/apache/flink/pull/12199#discussion_r429320579
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java ########## @@ -28,61 +28,106 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; -import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; +import org.apache.flink.table.api.internal.SelectResultProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.Row; import java.util.Iterator; import java.util.UUID; +import java.util.stream.Stream; /** - * Basic implementation of {@link SelectTableSink}. + * Basic implementation of {@link StreamTableSink} for select job to collect the result to local. */ -public class SelectTableSinkBase implements SelectTableSink { +public abstract class SelectTableSinkBase<T> implements StreamTableSink<T> { private final TableSchema tableSchema; - private final CollectSinkOperatorFactory<Row> factory; - private final CollectResultIterator<Row> iterator; + protected final DataFormatConverters.DataFormatConverter<RowData, Row> converter; + + private final CollectSinkOperatorFactory<T> factory; + private final CollectResultIterator<T> iterator; @SuppressWarnings("unchecked") - public SelectTableSinkBase(TableSchema tableSchema) { - this.tableSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp( - SelectTableSinkSchemaConverter.changeDefaultConversionClass(tableSchema)); + public SelectTableSinkBase(TableSchema schema, TypeSerializer<T> typeSerializer) { + this.tableSchema = schema; + this.converter = DataFormatConverters.getConverterForDataType(this.tableSchema.toPhysicalRowDataType()); Review comment: I find there some types that `DataStructureConverters` can't handle, such as when running `CalcITCase#testExternalTypeFunc1`, I get the following exception: ``` java.lang.ClassCastException: org.apache.flink.table.types.logical.TypeInformationRawType cannot be cast to org.apache.flink.table.types.logical.RawType at org.apache.flink.table.data.conversion.RawObjectConverter.create(RawObjectConverter.java:56) at org.apache.flink.table.data.conversion.DataStructureConverters.getConverterInternal(DataStructureConverters.java:157) at org.apache.flink.table.data.conversion.DataStructureConverters.getConverter(DataStructureConverters.java:136) at org.apache.flink.table.data.conversion.RowRowConverter.lambda$create$0(RowRowConverter.java:87) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org