wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r412925086



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,26 +35,31 @@
         *
         * @param currentRow latest row received by deduplicate function
         * @param generateUpdateBefore whether need to send UPDATE_BEFORE 
message for updates
-        * @param state state of function
+        * @param state state of function, null if generateUpdateBefore is false
         * @param out underlying collector
         */
        static void processLastRow(
                        BaseRow currentRow,
                        boolean generateUpdateBefore,
                        ValueState<BaseRow> state,
                        Collector<BaseRow> out) throws Exception {
-               // Check message should be accumulate
-               
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
-               if (generateUpdateBefore) {
-                       // state stores complete row if generateUpdateBefore is 
true
-                       BaseRow preRow = state.value();
-                       state.update(currentRow);
-                       if (preRow != null) {
-                               preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+               // check message should be insert only.
+               Preconditions.checkArgument(currentRow.getRowKind() == 
RowKind.INSERT);
+               BaseRow preRow = state.value();
+               state.update(currentRow);

Review comment:
       After offline disscussion, I will add an optimization configuration to 
always send AFTER instead of INSERT for first row. This can avoid state 
accessing but is disabled by default. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to