[ 
https://issues.apache.org/jira/browse/FLINK-21679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-21679:
-----------------------------------
    Labels: pull-request-available  (was: )

> Set output type for transformations from SourceProvider and 
> DataStreamScanProvider in CommonExecTableSourceScan
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21679
>                 URL: https://issues.apache.org/jira/browse/FLINK-21679
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Wei Zhong
>            Assignee: Wei Zhong
>            Priority: Major
>              Labels: pull-request-available
>
> Currently we only set output type for the transformations from 
> SourceFunctionProvider and InputFormatProvider in CommonExecTableSourceScan:
> {code:java}
> @Override
> protected Transformation<RowData> translateToPlanInternal(PlannerBase 
> planner) {
>     final StreamExecutionEnvironment env = planner.getExecEnv();
>     final String operatorName = getDescription();
>     final InternalTypeInfo<RowData> outputTypeInfo =
>             InternalTypeInfo.of((RowType) getOutputType());
>     final ScanTableSource tableSource = 
> tableSourceSpec.getScanTableSource(planner);
>     ScanTableSource.ScanRuntimeProvider provider =
>             
> tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
>     if (provider instanceof SourceFunctionProvider) {
>         SourceFunction<RowData> sourceFunction =
>                 ((SourceFunctionProvider) provider).createSourceFunction();
>         return env.addSource(sourceFunction, operatorName, 
> outputTypeInfo).getTransformation();
>     } else if (provider instanceof InputFormatProvider) {
>         InputFormat<RowData, ?> inputFormat =
>                 ((InputFormatProvider) provider).createInputFormat();
>         return createInputFormatTransformation(env, inputFormat, 
> outputTypeInfo, operatorName);
>     } else if (provider instanceof SourceProvider) {
>         // outputTypeInfo is not set here
>         Source<RowData, ?, ?> source = ((SourceProvider) 
> provider).createSource();
>         return env.fromSource(source, WatermarkStrategy.noWatermarks(), 
> operatorName)
>                 .getTransformation();
>     } else if (provider instanceof DataStreamScanProvider) {
>         // outputTypeInfo is not set here
>         return ((DataStreamScanProvider) 
> provider).produceDataStream(env).getTransformation();
>     } else {
>         throw new UnsupportedOperationException(
>                 provider.getClass().getSimpleName() + " is unsupported now.");
>     }
> }{code}
> We can also set output type for transformations from SourceProvider and 
> DataStreamScanProvider in CommonExecTableSourceScan, so that users do not 
> need to implement a ResultQueryable interface when implementing the new 
> Source interface in FLIP-27.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to