leonardBang commented on a change in pull request #14863:
URL: https://github.com/apache/flink/pull/14863#discussion_r579648904



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -86,7 +97,9 @@ static void processLastRowOnChangelog(
             RowData currentRow,
             boolean generateUpdateBefore,
             ValueState<RowData> state,
-            Collector<RowData> out)
+            Collector<RowData> out,
+            boolean isStateTtlEnabled,
+            RecordEqualiser equaliser)

Review comment:
       Please add note for added parameters

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
##########
@@ -260,14 +265,23 @@ protected RowtimeDeduplicateOperatorTranslator(
     /** Translator to create process time deduplicate operator. */
     private static class ProcTimeDeduplicateOperatorTranslator
             extends DeduplicateOperatorTranslator {
+        private final GeneratedRecordEqualiser generatedEqualiser;
 
         protected ProcTimeDeduplicateOperatorTranslator(
                 TableConfig tableConfig,
                 InternalTypeInfo<RowData> rowTypeInfo,
                 TypeSerializer<RowData> typeSerializer,
+                RowType inputRowType,
                 boolean keepLastRow,
                 boolean generateUpdateBefore) {
             super(tableConfig, rowTypeInfo, typeSerializer, keepLastRow, 
generateUpdateBefore);
+            final EqualiserCodeGenerator equaliserCodeGen =
+                    new EqualiserCodeGenerator(
+                            inputRowType.getFields().stream()
+                                    .map(RowType.RowField::getType)
+                                    .toArray(LogicalType[]::new));
+            generatedEqualiser =
+                    
equaliserCodeGen.generateRecordEqualiser("DeduplicateRowEqualiser");

Review comment:
       ```
   this.generatedEqualiser =
                       new EqualiserCodeGenerator(
                                       inputRowType.getFields().stream()
                                               .map(RowType.RowField::getType)
                                               .toArray(LogicalType[]::new))
                               
.generateRecordEqualiser("DeduplicateRowEqualiser");
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java
##########
@@ -45,4 +48,15 @@
                     inputRowType.toRowFieldTypes(),
                     new GenericRowRecordSortComparator(
                             rowKeyIdx, 
inputRowType.toRowFieldTypes()[rowKeyIdx]));
+
+    static GeneratedRecordEqualiser generatedEqualiser =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+
+                private static final long serialVersionUID = 
8932260133849746733L;

Review comment:
       Flink uses `1L` as  default `serialVersionUID`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
##########
@@ -81,6 +84,15 @@ public StreamExecChangelogNormalize(
                 tableConfig
                         .getConfiguration()
                         
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+
+        final EqualiserCodeGenerator equaliserCodeGen =
+                new EqualiserCodeGenerator(
+                        rowTypeInfo.toRowType().getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .toArray(LogicalType[]::new));
+        GeneratedRecordEqualiser generatedEqualiser =
+                
equaliserCodeGen.generateRecordEqualiser("DeduplicateRowEqualiser");
+

Review comment:
       we can optimize to:
   ```
   final GeneratedRecordEqualiser generatedEqualiser =
                   new EqualiserCodeGenerator(
                           rowTypeInfo.toRowType().getFields().stream()
                                   .map(RowType.RowField::getType)
                                   .toArray(LogicalType[]::new))
                           .generateRecordEqualiser("DeduplicateRowEqualiser");
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -96,12 +109,20 @@ static void processLastRowOnChangelog(
                 currentRow.setRowKind(RowKind.INSERT);
                 out.collect(currentRow);
             } else {
-                if (generateUpdateBefore) {
-                    preRow.setRowKind(RowKind.UPDATE_BEFORE);
-                    out.collect(preRow);
+                if (!isStateTtlEnabled && equaliser.equals(preRow, 
currentRow)) {
+                    // currentRow is the same as preRow and state cleaning is 
not enabled.
+                    // We do not emit retraction and update message.
+                    // If state cleaning is enabled, we have to emit messages 
to prevent too early
+                    // state eviction of downstream operators.

Review comment:
       Do we need to take care `isStateTtlEnabled ` here ? The `preRow`  value 
who comes from `state` has been influenced by `isStateTtlEnabled`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
##########
@@ -33,26 +36,50 @@
     private final boolean generateUpdateBefore;
     private final boolean generateInsert;
     private final boolean inputIsInsertOnly;
+    /** function used to equal RowData. */
+    private transient RecordEqualiser equaliser;
+
+    /** The code generated equaliser used to equal RowData. */
+    private final GeneratedRecordEqualiser genRecordEqualiser;
+
+    private final boolean isStateTtlEnabled;
 
     public ProcTimeDeduplicateKeepLastRowFunction(
             InternalTypeInfo<RowData> typeInfo,
             long stateRetentionTime,
             boolean generateUpdateBefore,
             boolean generateInsert,
-            boolean inputInsertOnly) {
+            boolean inputInsertOnly,
+            GeneratedRecordEqualiser genRecordEqualiser) {
         super(typeInfo, null, stateRetentionTime);
         this.generateUpdateBefore = generateUpdateBefore;
         this.generateInsert = generateInsert;
         this.inputIsInsertOnly = inputInsertOnly;
+        this.genRecordEqualiser = genRecordEqualiser;
+        this.isStateTtlEnabled = stateRetentionTime > 0;
     }
 
     @Override
     public void processElement(RowData input, Context ctx, Collector<RowData> 
out)
             throws Exception {
         if (inputIsInsertOnly) {
-            processLastRowOnProcTime(input, generateUpdateBefore, 
generateInsert, state, out);
+            processLastRowOnProcTime(
+                    input,
+                    generateUpdateBefore,
+                    generateInsert,
+                    state,
+                    out,
+                    isStateTtlEnabled,
+                    equaliser);
         } else {
-            processLastRowOnChangelog(input, generateUpdateBefore, state, out);
+            processLastRowOnChangelog(
+                    input, generateUpdateBefore, state, out, 
isStateTtlEnabled, equaliser);
         }
     }
+
+    @Override
+    public void open(Configuration configure) throws Exception {

Review comment:
       we can put the `open` method after constructor to make it more readable 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
##########
@@ -33,26 +36,50 @@
     private final boolean generateUpdateBefore;
     private final boolean generateInsert;
     private final boolean inputIsInsertOnly;
+    /** function used to equal RowData. */
+    private transient RecordEqualiser equaliser;

Review comment:
       minor: The record equaliser used to equal RowData.
   
   we can put the transient variable to the end of all members
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to