1996fanrui commented on code in PR #23597:
URL: https://github.com/apache/flink/pull/23597#discussion_r1372830355


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -641,16 +641,26 @@ private Map<StreamConfig.SourceInputConfig, 
ChainedSource> createChainedSources(
     @Nullable
     private Counter getOperatorRecordsOutCounter(
             StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
-        String streamOperatorFactoryClassName = 
operatorConfig.getStreamOperatorFactoryClassName();
+        ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
+        Class<StreamOperatorFactory<?>> streamOperatorFactoryClass =
+                
operatorConfig.getStreamOperatorFactoryClass(userCodeClassloader);
+
         // Do not use the numRecordsOut counter on output if this operator is 
SinkWriterOperator.
         //
         // Metric "numRecordsOut" is defined as the total number of records 
written to the
         // external system in FLIP-33, but this metric is occupied in 
AbstractStreamOperator as the
         // number of records sent to downstream operators, which is number of 
Committable batches
         // sent to SinkCommitter. So we skip registering this metric on output 
and leave this metric
         // to sink writer implementations to report.
-        if 
(SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName))
 {
-            return null;
+        try {
+            Class<?> sinkWriterFactoryClass =
+                    
userCodeClassloader.loadClass(SinkWriterOperatorFactory.class.getName());
+            if 
(sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) {

Review Comment:
   > Thanks for the update! Have you finally decided to go with the solution 3 
instead of solution 1?
   
   Hi @pnowojski , as we discussed at this comment!
   
   I didn't use the solution3 before, because I'm worried about the 
`SinkWriterOperatorFactory` and `streamOperatorFactoryClass` are loaded by 
different classloader. If they are loaded by different classloader, 
`sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)` cannot be 
determined accurately whether streamOperatorFactoryClass is a subclass of 
sinkWriterFactoryClass.
   
   However, after I double-check, I found they are loaded by the same 
classloader(`containingTask.getUserCodeClassLoader()`), so my worries are 
unnecessary, solution3 can work.
   
   The `configuration.getStreamOperatorFactory(userCodeClassloader);` also use 
the `userCodeClassloader` in OperatorChain.
   
   Above is why solution3 can work.
   
   -----------------------------------------
   
   Following is the difference between solutions 1 and 3:
   
   - Solution3 can work directly even if adding new sub-class of 
`SinkWriterOperatorFactory`, because 
`sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)` can 
support it.
   - For Solution1, developer must handle it if he want to add a subclass of 
`SinkWriterOperatorFactory`.
     - And we need to explain some backgrounds in the comments, why throw an 
exception when pass a subclass of  `SinkWriterOperatorFactory`.
     - If anyone wants to adding a subclass of `SinkWriterOperatorFactory`, the 
solution1 cannot work. He may still need to use a solution similar to solution3.
   
   BTW, if I change this line from `if 
(sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) {` to `if 
(sinkWriterFactoryClass.equals(streamOperatorFactoryClass)) {`, it will be 
solution1.
   
   Based on these differences, solution3 is easy to understand and maintain, 
WDYT?
   
   Please correct me if my understanding is wrong, thanks~
   
   
   [1] 
https://github.com/apache/flink/blob/3ff225c5f993282d6dfc7726fc08cc00058d9a7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L169
 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to