pnowojski commented on code in PR #23550:
URL: https://github.com/apache/flink/pull/23550#discussion_r1368508673


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -368,7 +373,8 @@ public void setStreamOperator(StreamOperator<?> operator) {
 
     public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
         if (factory != null) {
-            toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+            toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+            config.setString(SERIALIZED_UDF_CLASS_NAME, 
factory.getClass().getName());

Review Comment:
   Maybe change the last line to:
   ```
   toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
   config.setString(IS_INSTANCE_OF_SinkWriterOperatorFactory, factory instance 
of SinkWriterOperatorFactory);
   ```
   ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java:
##########
@@ -641,17 +641,15 @@ private Map<StreamConfig.SourceInputConfig, 
ChainedSource> createChainedSources(
     @Nullable
     private Counter getOperatorRecordsOutCounter(
             StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
-        ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
-        StreamOperatorFactory<?> operatorFactory =
-                operatorConfig.getStreamOperatorFactory(userCodeClassloader);
+        String streamOperatorFactoryClassName = 
operatorConfig.getStreamOperatorFactoryClassName();
         // Do not use the numRecordsOut counter on output if this operator is 
SinkWriterOperator.
         //
         // Metric "numRecordsOut" is defined as the total number of records 
written to the
         // external system in FLIP-33, but this metric is occupied in 
AbstractStreamOperator as the
         // number of records sent to downstream operators, which is number of 
Committable batches
         // sent to SinkCommitter. So we skip registering this metric on output 
and leave this metric
         // to sink writer implementations to report.
-        if (operatorFactory instanceof SinkWriterOperatorFactory) {
+        if 
(SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName))
 {

Review Comment:
   @1996fanrui , are those actually the equivalent lines? What if the used 
`operatorFactory` is a subclass of `SinkWriterOperatorFactory`, with a 
different name? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to