wuchong commented on a change in pull request #14863: URL: https://github.com/apache/flink/pull/14863#discussion_r580085880
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java ########## @@ -81,6 +84,14 @@ public StreamExecChangelogNormalize( tableConfig .getConfiguration() .getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED); + + GeneratedRecordEqualiser generatedEqualiser = + new EqualiserCodeGenerator( + rowTypeInfo.toRowType().getFields().stream() + .map(RowType.RowField::getType) + .toArray(LogicalType[]::new)) Review comment: It seems that we have more than 3 places using a long method chain to create the field logical types. I think we can provide a new constructor for EqualiserCodeGenerator to accepting `RowType`. This can simplify code and improve readabliity. E.g. ```scala def this(rowType: RowType) = { this(rowType.getChildren.asScala.toArray) } ``` ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java ########## @@ -33,26 +36,49 @@ private final boolean generateUpdateBefore; private final boolean generateInsert; private final boolean inputIsInsertOnly; + private final boolean isStateTtlEnabled; + /** The code generated equaliser used to equal RowData. */ + private final GeneratedRecordEqualiser genRecordEqualiser; + + /** The record equaliser used to equal RowData. */ + private transient RecordEqualiser equaliser; public ProcTimeDeduplicateKeepLastRowFunction( InternalTypeInfo<RowData> typeInfo, long stateRetentionTime, boolean generateUpdateBefore, boolean generateInsert, - boolean inputInsertOnly) { + boolean inputInsertOnly, + GeneratedRecordEqualiser genRecordEqualiser) { super(typeInfo, null, stateRetentionTime); this.generateUpdateBefore = generateUpdateBefore; this.generateInsert = generateInsert; this.inputIsInsertOnly = inputInsertOnly; + this.genRecordEqualiser = genRecordEqualiser; + this.isStateTtlEnabled = stateRetentionTime > 0; Review comment: nit: We can put this member variable into parant class `DeduplicateFunctionBase`, this variable is used by multiple classes. ########## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunctionTest.java ########## @@ -92,14 +102,31 @@ public void testWithGenerateUpdateBefore() throws Exception { testHarness.open(); testHarness.processElement(insertRecord("book", 1L, 12)); testHarness.processElement(insertRecord("book", 2L, 11)); - testHarness.processElement(insertRecord("book", 1L, 13)); + testHarness.processElement(insertRecord("book", 1L, 12)); testHarness.close(); // Keep LastRow in deduplicate may send UPDATE_BEFORE List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(insertRecord("book", 1L, 12)); expectedOutput.add(updateBeforeRecord("book", 1L, 12)); - expectedOutput.add(updateAfterRecord("book", 1L, 13)); + expectedOutput.add(updateAfterRecord("book", 1L, 12)); + expectedOutput.add(insertRecord("book", 2L, 11)); + assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); + } + + @Test + public void testDeduplicateWithGenerateUpdateBefore() throws Exception { Review comment: Please use a more suitable method name for this test, this method name sounds same with the above one `testWithGenerateUpdateBefore`. What do you think about `testWithStateTtlDisabled`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org