fapaul commented on a change in pull request #16399:
URL: https://github.com/apache/flink/pull/16399#discussion_r682670462



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator.java
##########
@@ -145,29 +150,60 @@ public Long timestamp() {
 
     private static class InitContextImpl implements Sink.InitContext {
 
-        private final int subtaskIdx;
-
         private final ProcessingTimeService processingTimeService;
 
+        private final MailboxExecutor mailboxExecutor;
+
         private final MetricGroup metricGroup;
 
+        private final StreamingRuntimeContext runtimeContext;
+
         public InitContextImpl(
-                int subtaskIdx,
+                StreamingRuntimeContext runtimeContext,
                 ProcessingTimeService processingTimeService,
+                MailboxExecutor mailboxExecutor,
                 MetricGroup metricGroup) {
-            this.subtaskIdx = subtaskIdx;
+            this.runtimeContext = checkNotNull(runtimeContext);
+            this.mailboxExecutor = checkNotNull(mailboxExecutor);
             this.processingTimeService = checkNotNull(processingTimeService);
             this.metricGroup = checkNotNull(metricGroup);
         }
 
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return new UserCodeClassLoader() {
+                @Override
+                public ClassLoader asClassLoader() {
+                    return runtimeContext.getUserCodeClassLoader();
+                }
+
+                @Override
+                public void registerReleaseHookIfAbsent(
+                        String releaseHookName, Runnable releaseHook) {
+                    
runtimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent(
+                            releaseHookName, releaseHook);
+                }
+            };
+        }
+
+        @Override
+        public int getNumberOfParallelSubtasks() {
+            return runtimeContext.getNumberOfParallelSubtasks();
+        }
+
+        @Override
+        public MailboxExecutor getMailboxExecutor() {
+            return mailboxExecutor;
+        }
+
         @Override
         public Sink.ProcessingTimeService getProcessingTimeService() {
             return new ProcessingTimerServiceImpl(processingTimeService);
         }
 
         @Override
         public int getSubtaskId() {
-            return subtaskIdx;
+            return runtimeContext.getIndexOfThisSubtask();

Review comment:
       Nit: Can the index ever change during the runtime of the task?
   If not we can retrieve the value only once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to