dannycranmer commented on code in PR #21497:
URL: https://github.com/apache/flink/pull/21497#discussion_r1086767065


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -460,87 +461,95 @@ private Transformation<?> applySinkProvider(
             SinkRuntimeProvider runtimeProvider,
             int rowtimeFieldIndex,
             int sinkParallelism,
-            ExecNodeConfig config) {
-        TransformationMetadata sinkMeta = 
createTransformationMeta(SINK_TRANSFORMATION, config);
-        if (runtimeProvider instanceof DataStreamSinkProvider) {
-            Transformation<RowData> sinkTransformation =
-                    applyRowtimeTransformation(
-                            inputTransform, rowtimeFieldIndex, 
sinkParallelism, config);
-            final DataStream<RowData> dataStream = new DataStream<>(env, 
sinkTransformation);
-            final DataStreamSinkProvider provider = (DataStreamSinkProvider) 
runtimeProvider;
-            return provider.consumeDataStream(createProviderContext(config), 
dataStream)
-                    .getTransformation();
-        } else if (runtimeProvider instanceof TransformationSinkProvider) {
-            final TransformationSinkProvider provider =
-                    (TransformationSinkProvider) runtimeProvider;
-            return provider.createTransformation(
-                    new TransformationSinkProvider.Context() {
-                        @Override
-                        public Transformation<RowData> 
getInputTransformation() {
-                            return inputTransform;
-                        }
-
-                        @Override
-                        public int getRowtimeIndex() {
-                            return rowtimeFieldIndex;
-                        }
-
-                        @Override
-                        public Optional<String> generateUid(String name) {
-                            return 
createProviderContext(config).generateUid(name);
-                        }
-                    });
-        } else if (runtimeProvider instanceof SinkFunctionProvider) {
-            final SinkFunction<RowData> sinkFunction =
-                    ((SinkFunctionProvider) 
runtimeProvider).createSinkFunction();
-            return createSinkFunctionTransformation(
-                    sinkFunction,
-                    env,
-                    inputTransform,
-                    rowtimeFieldIndex,
-                    sinkMeta,
-                    sinkParallelism);
-        } else if (runtimeProvider instanceof OutputFormatProvider) {
-            OutputFormat<RowData> outputFormat =
-                    ((OutputFormatProvider) 
runtimeProvider).createOutputFormat();
-            final SinkFunction<RowData> sinkFunction = new 
OutputFormatSinkFunction<>(outputFormat);
-            return createSinkFunctionTransformation(
-                    sinkFunction,
-                    env,
-                    inputTransform,
-                    rowtimeFieldIndex,
-                    sinkMeta,
-                    sinkParallelism);
-        } else if (runtimeProvider instanceof SinkProvider) {
-            Transformation<RowData> sinkTransformation =
-                    applyRowtimeTransformation(
-                            inputTransform, rowtimeFieldIndex, 
sinkParallelism, config);
-            final DataStream<RowData> dataStream = new DataStream<>(env, 
sinkTransformation);
-            final Transformation<?> transformation =
-                    DataStreamSink.forSinkV1(
-                                    dataStream,
-                                    ((SinkProvider) 
runtimeProvider).createSink(),
-                                    CustomSinkOperatorUidHashes.DEFAULT)
-                            .getTransformation();
-            transformation.setParallelism(sinkParallelism);
-            sinkMeta.fill(transformation);
-            return transformation;
-        } else if (runtimeProvider instanceof SinkV2Provider) {
-            Transformation<RowData> sinkTransformation =
-                    applyRowtimeTransformation(
-                            inputTransform, rowtimeFieldIndex, 
sinkParallelism, config);
-            final DataStream<RowData> dataStream = new DataStream<>(env, 
sinkTransformation);
-            final Transformation<?> transformation =
-                    DataStreamSink.forSink(
-                                    dataStream,
-                                    ((SinkV2Provider) 
runtimeProvider).createSink(),
-                                    CustomSinkOperatorUidHashes.DEFAULT)
-                            .getTransformation();
-            transformation.setParallelism(sinkParallelism);
-            sinkMeta.fill(transformation);
-            return transformation;
-        } else {
-            throw new TableException("Unsupported sink runtime provider.");
+            ExecNodeConfig config,
+            ClassLoader classLoader) {
+        final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   Can we use 
[TemporaryClassLoaderContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java)
 since this seems to be the standard way to flip out the classloader?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to