[ https://issues.apache.org/jira/browse/FLINK-21679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-21679: ----------------------------------- Labels: pull-request-available (was: ) > Set output type for transformations from SourceProvider and > DataStreamScanProvider in CommonExecTableSourceScan > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-21679 > URL: https://issues.apache.org/jira/browse/FLINK-21679 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: Wei Zhong > Assignee: Wei Zhong > Priority: Major > Labels: pull-request-available > > Currently we only set output type for the transformations from > SourceFunctionProvider and InputFormatProvider in CommonExecTableSourceScan: > {code:java} > @Override > protected Transformation<RowData> translateToPlanInternal(PlannerBase > planner) { > final StreamExecutionEnvironment env = planner.getExecEnv(); > final String operatorName = getDescription(); > final InternalTypeInfo<RowData> outputTypeInfo = > InternalTypeInfo.of((RowType) getOutputType()); > final ScanTableSource tableSource = > tableSourceSpec.getScanTableSource(planner); > ScanTableSource.ScanRuntimeProvider provider = > > tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); > if (provider instanceof SourceFunctionProvider) { > SourceFunction<RowData> sourceFunction = > ((SourceFunctionProvider) provider).createSourceFunction(); > return env.addSource(sourceFunction, operatorName, > outputTypeInfo).getTransformation(); > } else if (provider instanceof InputFormatProvider) { > InputFormat<RowData, ?> inputFormat = > ((InputFormatProvider) provider).createInputFormat(); > return createInputFormatTransformation(env, inputFormat, > outputTypeInfo, operatorName); > } else if (provider instanceof SourceProvider) { > // outputTypeInfo is not set here > Source<RowData, ?, ?> source = ((SourceProvider) > provider).createSource(); > return env.fromSource(source, WatermarkStrategy.noWatermarks(), > operatorName) > .getTransformation(); > } else if (provider instanceof DataStreamScanProvider) { > // outputTypeInfo is not set here > return ((DataStreamScanProvider) > provider).produceDataStream(env).getTransformation(); > } else { > throw new UnsupportedOperationException( > provider.getClass().getSimpleName() + " is unsupported now."); > } > }{code} > We can also set output type for transformations from SourceProvider and > DataStreamScanProvider in CommonExecTableSourceScan, so that users do not > need to implement a ResultQueryable interface when implementing the new > Source interface in FLIP-27. > -- This message was sent by Atlassian Jira (v8.3.4#803005)