gustavodemorais commented on code in PR #27508:
URL: https://github.com/apache/flink/pull/27508#discussion_r2889319585


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java:
##########
@@ -188,10 +188,14 @@ public int getArity() {
     /** Convert {@link RowData} into {@link BinaryRowData}. TODO modify it to 
code gen. */
     @Override
     public BinaryRowData toBinaryRow(RowData row) {
+        return toBinaryRow(row, false);
+    }
+
+    public BinaryRowData toBinaryRow(RowData row, boolean prohibitReuseRow) {

Review Comment:
   ```suggestion
       public BinaryRowData toBinaryRow(RowData row, boolean requiresDeepCopy) {
   ```



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java:
##########
@@ -188,10 +188,14 @@ public int getArity() {
     /** Convert {@link RowData} into {@link BinaryRowData}. TODO modify it to 
code gen. */
     @Override
     public BinaryRowData toBinaryRow(RowData row) {
+        return toBinaryRow(row, false);
+    }
+
+    public BinaryRowData toBinaryRow(RowData row, boolean prohibitReuseRow) {
         if (row instanceof BinaryRowData) {
             return (BinaryRowData) row;
         }
-        if (reuseRow == null) {
+        if (reuseRow == null || prohibitReuseRow) {

Review Comment:
   ```suggestion
           if (reuseRow == null || requiresDeepCopy) {
   ```



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java:
##########
@@ -357,7 +370,7 @@ private RowData getStateKey(@Nullable RowData joinKey, 
RowData record) {
                 GenericRowData compositeKey = new GenericRowData(2);
                 compositeKey.setField(0, joinKey);
                 compositeKey.setField(1, record);
-                return compositeKey;
+                return stateKeySerializer.toBinaryRow(compositeKey, 
prohibitReuseRowData);

Review Comment:
   Are we sure these cause also a bug? We use the keys here a bit differently. 
I'd recommend doing this in another jira ticket/pr and undoing the changes to 
this file for now. We can fix the original issue and if you can find a 
reproducible test we can fix it separately. Wdyt?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java:
##########
@@ -181,13 +194,23 @@ public int[] getCommonJoinKeyIndices(final int inputId) {
         return 
extractors.stream().mapToInt(KeyExtractor::getFieldIndexInSourceRow).toArray();
     }
 
+    @Override
+    public void enableCopyingRowData() {
+        this.needCopyingRowData = true;

Review Comment:
   ```suggestion
       public void requiresKeyDeepCopy() {
           this.requiresKeyDeepCopy = true;
   ```



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java:
##########
@@ -609,6 +612,12 @@ private Integer processRecords(
         }
     }
 
+    private boolean checkNonOrderedBackend() {

Review Comment:
   ```suggestion
       private boolean isHeapBackend() {
   ```
   
   I think we should we check for heap instead of the opposite? 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java:
##########
@@ -301,7 +333,7 @@ private GenericRowData buildKeyRowFromSourceRow(
         for (int i = 0; i < keyExtractors.size(); i++) {
             keyRow.setField(i, 
keyExtractors.get(i).getRightSideKey(sourceRow));
         }
-        return keyRow;
+        return keySerializer.toBinaryRow(keyRow, needCopyingRowData);

Review Comment:
   I think we should only do this for the required case and avoid additional 
operations here since it's a hot path:
   
   ```suggestion
   if (requiresKeyDeepCopy) {
           return keySerializer.toBinaryRow(keyRow, !requiresKeyDeepCopy);
   } else {
           keyRow;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to