dannycranmer commented on code in PR #21497: URL: https://github.com/apache/flink/pull/21497#discussion_r1086767065
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java: ########## @@ -460,87 +461,95 @@ private Transformation<?> applySinkProvider( SinkRuntimeProvider runtimeProvider, int rowtimeFieldIndex, int sinkParallelism, - ExecNodeConfig config) { - TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config); - if (runtimeProvider instanceof DataStreamSinkProvider) { - Transformation<RowData> sinkTransformation = - applyRowtimeTransformation( - inputTransform, rowtimeFieldIndex, sinkParallelism, config); - final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation); - final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider; - return provider.consumeDataStream(createProviderContext(config), dataStream) - .getTransformation(); - } else if (runtimeProvider instanceof TransformationSinkProvider) { - final TransformationSinkProvider provider = - (TransformationSinkProvider) runtimeProvider; - return provider.createTransformation( - new TransformationSinkProvider.Context() { - @Override - public Transformation<RowData> getInputTransformation() { - return inputTransform; - } - - @Override - public int getRowtimeIndex() { - return rowtimeFieldIndex; - } - - @Override - public Optional<String> generateUid(String name) { - return createProviderContext(config).generateUid(name); - } - }); - } else if (runtimeProvider instanceof SinkFunctionProvider) { - final SinkFunction<RowData> sinkFunction = - ((SinkFunctionProvider) runtimeProvider).createSinkFunction(); - return createSinkFunctionTransformation( - sinkFunction, - env, - inputTransform, - rowtimeFieldIndex, - sinkMeta, - sinkParallelism); - } else if (runtimeProvider instanceof OutputFormatProvider) { - OutputFormat<RowData> outputFormat = - ((OutputFormatProvider) runtimeProvider).createOutputFormat(); - final SinkFunction<RowData> sinkFunction = new OutputFormatSinkFunction<>(outputFormat); - return createSinkFunctionTransformation( - sinkFunction, - env, - inputTransform, - rowtimeFieldIndex, - sinkMeta, - sinkParallelism); - } else if (runtimeProvider instanceof SinkProvider) { - Transformation<RowData> sinkTransformation = - applyRowtimeTransformation( - inputTransform, rowtimeFieldIndex, sinkParallelism, config); - final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation); - final Transformation<?> transformation = - DataStreamSink.forSinkV1( - dataStream, - ((SinkProvider) runtimeProvider).createSink(), - CustomSinkOperatorUidHashes.DEFAULT) - .getTransformation(); - transformation.setParallelism(sinkParallelism); - sinkMeta.fill(transformation); - return transformation; - } else if (runtimeProvider instanceof SinkV2Provider) { - Transformation<RowData> sinkTransformation = - applyRowtimeTransformation( - inputTransform, rowtimeFieldIndex, sinkParallelism, config); - final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation); - final Transformation<?> transformation = - DataStreamSink.forSink( - dataStream, - ((SinkV2Provider) runtimeProvider).createSink(), - CustomSinkOperatorUidHashes.DEFAULT) - .getTransformation(); - transformation.setParallelism(sinkParallelism); - sinkMeta.fill(transformation); - return transformation; - } else { - throw new TableException("Unsupported sink runtime provider."); + ExecNodeConfig config, + ClassLoader classLoader) { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); Review Comment: Can we use [TemporaryClassLoaderContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java) since this seems to be the standard way to flip out the classloader? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org