1996fanrui commented on code in PR #23597: URL: https://github.com/apache/flink/pull/23597#discussion_r1372830355
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java: ########## @@ -641,16 +641,26 @@ private Map<StreamConfig.SourceInputConfig, ChainedSource> createChainedSources( @Nullable private Counter getOperatorRecordsOutCounter( StreamTask<?, ?> containingTask, StreamConfig operatorConfig) { - String streamOperatorFactoryClassName = operatorConfig.getStreamOperatorFactoryClassName(); + ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); + Class<StreamOperatorFactory<?>> streamOperatorFactoryClass = + operatorConfig.getStreamOperatorFactoryClass(userCodeClassloader); + // Do not use the numRecordsOut counter on output if this operator is SinkWriterOperator. // // Metric "numRecordsOut" is defined as the total number of records written to the // external system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the // number of records sent to downstream operators, which is number of Committable batches // sent to SinkCommitter. So we skip registering this metric on output and leave this metric // to sink writer implementations to report. - if (SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)) { - return null; + try { + Class<?> sinkWriterFactoryClass = + userCodeClassloader.loadClass(SinkWriterOperatorFactory.class.getName()); + if (sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) { Review Comment: > Thanks for the update! Have you finally decided to go with the solution 3 instead of solution 1? Hi @pnowojski , as we discussed at this comment! I didn't use the solution3 before, because I'm worried about the `SinkWriterOperatorFactory` and `streamOperatorFactoryClass` are loaded by different classloader. If they are loaded by different classloader, `sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)` cannot be determined accurately whether streamOperatorFactoryClass is a subclass of sinkWriterFactoryClass. However, after I double-check, I found they are loaded by the same classloader(`containingTask.getUserCodeClassLoader()`), so my worries are unnecessary, solution3 can work. The `configuration.getStreamOperatorFactory(userCodeClassloader);` also use the `userCodeClassloader` in OperatorChain. Above is why solution3 can work. ----------------------------------------- Following is the difference between solutions 1 and 3: - Solution3 can work directly even if adding new sub-class of `SinkWriterOperatorFactory`, because `sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)` can support it. - For Solution1, developer must handle it if he want to add a subclass of `SinkWriterOperatorFactory`. - And we need to explain some backgrounds in the comments, why throw an exception when pass a subclass of `SinkWriterOperatorFactory`. - If anyone wants to adding a subclass of `SinkWriterOperatorFactory`, the solution1 cannot work. He may still need to use a solution similar to solution3. BTW, if I change this line from `if (sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) {` to `if (sinkWriterFactoryClass.equals(streamOperatorFactoryClass)) {`, it will be solution1. Based on these differences, solution3 is easy to understand and maintain, WDYT? Please correct me if my understanding is wrong, thanks~ [1] https://github.com/apache/flink/blob/3ff225c5f993282d6dfc7726fc08cc00058d9a7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L169 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org