zhipeng93 commented on code in PR #215:
URL: https://github.com/apache/flink-ml/pull/215#discussion_r1169464509


##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java:
##########
@@ -269,125 +279,136 @@ private OperatorMetricGroup createOperatorMetricGroup(
     }
 
     /**
-     * extracts common processing logic in subclasses' processing elements.
+     * Extracts common processing logic in subclasses' processing elements.
      *
-     * @param streamRecord the input record.
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param streamRecord The input record.
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, 
i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     protected void processElementX(
             StreamRecord streamRecord,
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        if (!isBlocked[inputIndex]) {
-            if (areBroadcastVariablesReady()) {
-                if (hasPendingElements[inputIndex]) {
-                    processPendingElementsAndWatermarks(
-                            inputIndex, elementConsumer, watermarkConsumer);
-                    hasPendingElements[inputIndex] = false;
-                }
-                elementConsumer.accept(streamRecord);
-
-            } else {
-                dataCacheWriters[inputIndex].addRecord(
-                        CacheElement.newRecord(streamRecord.getValue()));
-            }
-
-        } else {
+        if (!hasRichFunction) {
+            elementConsumer.accept(streamRecord);
+        } else if (isBlocked[inputIndex]) {
             while (!areBroadcastVariablesReady()) {
                 mailboxExecutor.yield();
             }
             elementConsumer.accept(streamRecord);
+        } else if (!areBroadcastVariablesReady()) {
+            
dataCacheWriters[inputIndex].addRecord(CacheElement.newRecord(streamRecord.getValue()));
+        } else {
+            if (hasPendingElements[inputIndex]) {
+                processPendingElementsAndWatermarks(
+                        inputIndex, elementConsumer, watermarkConsumer, 
keyContextSetter);
+                hasPendingElements[inputIndex] = false;
+            }
+            keyContextSetter.accept(streamRecord);
+            elementConsumer.accept(streamRecord);
         }
     }
 
     /**
-     * extracts common processing logic in subclasses' processing watermarks.
+     * Extracts common processing logic in subclasses' processing watermarks.
      *
-     * @param watermark the input watermark.
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param watermark The input watermark.
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, 
i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     protected void processWatermarkX(
             Watermark watermark,
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        if (!isBlocked[inputIndex]) {
-            if (areBroadcastVariablesReady()) {
-                if (hasPendingElements[inputIndex]) {
-                    processPendingElementsAndWatermarks(
-                            inputIndex, elementConsumer, watermarkConsumer);
-                    hasPendingElements[inputIndex] = false;
-                }
-                watermarkConsumer.accept(watermark);
-
-            } else {
-                dataCacheWriters[inputIndex].addRecord(
-                        CacheElement.newWatermark(watermark.getTimestamp()));
-            }
-
-        } else {
+        if (!hasRichFunction) {
+            watermarkConsumer.accept(watermark);
+        } else if (isBlocked[inputIndex]) {
             while (!areBroadcastVariablesReady()) {
                 mailboxExecutor.yield();
             }
             watermarkConsumer.accept(watermark);
+        } else if (!areBroadcastVariablesReady()) {
+            dataCacheWriters[inputIndex].addRecord(
+                    CacheElement.newWatermark(watermark.getTimestamp()));
+        } else {
+            if (hasPendingElements[inputIndex]) {
+                processPendingElementsAndWatermarks(
+                        inputIndex, elementConsumer, watermarkConsumer, 
keyContextSetter);
+                hasPendingElements[inputIndex] = false;
+            }
+            watermarkConsumer.accept(watermark);
         }
     }
 
     /**
-     * extracts common processing logic in subclasses' endInput(...).
+     * Extracts common processing logic in subclasses' endInput(...).
      *
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, 
i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings("rawtypes")
     protected void endInputX(
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        while (!areBroadcastVariablesReady()) {
-            mailboxExecutor.yield();
-        }
-        if (hasPendingElements[inputIndex]) {
-            processPendingElementsAndWatermarks(inputIndex, elementConsumer, 
watermarkConsumer);
-            hasPendingElements[inputIndex] = false;
+        if (hasRichFunction) {

Review Comment:
   Thanks for the comment. I update updated all of the `if (hasRichFunction)` 
except the one in the constructor, which I think it provides better readability.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java:
##########
@@ -269,125 +279,136 @@ private OperatorMetricGroup createOperatorMetricGroup(
     }
 
     /**
-     * extracts common processing logic in subclasses' processing elements.
+     * Extracts common processing logic in subclasses' processing elements.
      *
-     * @param streamRecord the input record.
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param streamRecord The input record.
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, 
i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     protected void processElementX(
             StreamRecord streamRecord,
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        if (!isBlocked[inputIndex]) {
-            if (areBroadcastVariablesReady()) {
-                if (hasPendingElements[inputIndex]) {
-                    processPendingElementsAndWatermarks(
-                            inputIndex, elementConsumer, watermarkConsumer);
-                    hasPendingElements[inputIndex] = false;
-                }
-                elementConsumer.accept(streamRecord);
-
-            } else {
-                dataCacheWriters[inputIndex].addRecord(
-                        CacheElement.newRecord(streamRecord.getValue()));
-            }
-
-        } else {
+        if (!hasRichFunction) {
+            elementConsumer.accept(streamRecord);
+        } else if (isBlocked[inputIndex]) {
             while (!areBroadcastVariablesReady()) {
                 mailboxExecutor.yield();
             }
             elementConsumer.accept(streamRecord);
+        } else if (!areBroadcastVariablesReady()) {
+            
dataCacheWriters[inputIndex].addRecord(CacheElement.newRecord(streamRecord.getValue()));
+        } else {
+            if (hasPendingElements[inputIndex]) {
+                processPendingElementsAndWatermarks(
+                        inputIndex, elementConsumer, watermarkConsumer, 
keyContextSetter);
+                hasPendingElements[inputIndex] = false;
+            }
+            keyContextSetter.accept(streamRecord);
+            elementConsumer.accept(streamRecord);
         }
     }
 
     /**
-     * extracts common processing logic in subclasses' processing watermarks.
+     * Extracts common processing logic in subclasses' processing watermarks.
      *
-     * @param watermark the input watermark.
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param watermark The input watermark.
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, 
i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     protected void processWatermarkX(
             Watermark watermark,
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        if (!isBlocked[inputIndex]) {
-            if (areBroadcastVariablesReady()) {
-                if (hasPendingElements[inputIndex]) {
-                    processPendingElementsAndWatermarks(
-                            inputIndex, elementConsumer, watermarkConsumer);
-                    hasPendingElements[inputIndex] = false;
-                }
-                watermarkConsumer.accept(watermark);
-
-            } else {
-                dataCacheWriters[inputIndex].addRecord(
-                        CacheElement.newWatermark(watermark.getTimestamp()));
-            }
-
-        } else {
+        if (!hasRichFunction) {
+            watermarkConsumer.accept(watermark);
+        } else if (isBlocked[inputIndex]) {
             while (!areBroadcastVariablesReady()) {
                 mailboxExecutor.yield();
             }
             watermarkConsumer.accept(watermark);
+        } else if (!areBroadcastVariablesReady()) {
+            dataCacheWriters[inputIndex].addRecord(
+                    CacheElement.newWatermark(watermark.getTimestamp()));
+        } else {
+            if (hasPendingElements[inputIndex]) {
+                processPendingElementsAndWatermarks(
+                        inputIndex, elementConsumer, watermarkConsumer, 
keyContextSetter);
+                hasPendingElements[inputIndex] = false;
+            }
+            watermarkConsumer.accept(watermark);
         }
     }
 
     /**
-     * extracts common processing logic in subclasses' endInput(...).
+     * Extracts common processing logic in subclasses' endInput(...).
      *
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, 
i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings("rawtypes")
     protected void endInputX(
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        while (!areBroadcastVariablesReady()) {
-            mailboxExecutor.yield();
-        }
-        if (hasPendingElements[inputIndex]) {
-            processPendingElementsAndWatermarks(inputIndex, elementConsumer, 
watermarkConsumer);
-            hasPendingElements[inputIndex] = false;
+        if (hasRichFunction) {

Review Comment:
   Thanks for the comment. I updated all of the `if (hasRichFunction)` except 
the one in the constructor, which I think it provides better readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to