godfreyhe commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411073864



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
##########
@@ -129,13 +130,15 @@ public int getArity() {
        }
 
        @Override
-       public byte getHeader() {
+       public RowKind getRowKind() {
                // first nullBitsSizeInBytes byte is header.
-               return segments[0].get(offset);
+               byte header = segments[0].get(offset);
+               return RowKind.values()[header];

Review comment:
       big +1. If we add a new enum value in the middle, the index will also 
change.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java
##########
@@ -199,27 +196,27 @@ public void finishBundle(Map<BaseRow, BaseRow> buffer, 
Collector<BaseRow> out) t
                                                // new row is not same with 
prev row
                                                if (generateUpdateBefore) {
                                                        // prepare retraction 
message for previous row

Review comment:
       please update the comment

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
##########
@@ -113,13 +113,7 @@ public BaseRow getRow(int ordinal, int numFields) {
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
-               sb.append("(");
-               if (BaseRowUtil.isAccumulateMsg(this)) {
-                       sb.append("+");
-               } else {
-                       sb.append("-");
-               }
-               sb.append("|");
+               sb.append(rowKind.shortString()).append("(");

Review comment:
       unify the format of `toString`.  in `BinaryRow`, the format is [UA|...]

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
##########
@@ -127,6 +127,10 @@ public void processElement2(StreamRecord<BaseRow> element) 
throws Exception {
         * method is too complex, so we provide the pseudo code to help 
understand the logic. We should
         * keep sync the following pseudo code with the real logic of the 
method.
         *
+        * <p>Note: "+" represents "INSERT", "-" represents "DELETE", "*" 
represents input row kind.

Review comment:
       consistent with the shortString value of RowKind ?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
##########
@@ -17,14 +17,16 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.types.RowKind;
+
 /**
  * Join two row to one row.
  */
 public final class JoinedRow implements BaseRow {
 
        private BaseRow row1;
        private BaseRow row2;
-       private byte header;
+       private RowKind rowKind = RowKind.INSERT;

Review comment:
       It's better we remove default value to avoid forgetting to set RowKind 
somewhere

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,25 +37,29 @@
         *
         * @param currentRow latest row received by deduplicate function
         * @param generateUpdateBefore whether need to send UPDATE_BEFORE 
message for updates
-        * @param state state of function
+        * @param state state of function, null if generateUpdateBefore is false
         * @param out underlying collector
         */
        static void processLastRow(
                        BaseRow currentRow,
                        boolean generateUpdateBefore,
-                       ValueState<BaseRow> state,
+                       @Nullable ValueState<BaseRow> state,
                        Collector<BaseRow> out) throws Exception {
-               // Check message should be accumulate
-               
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+               // check message should be insert only.
+               Preconditions.checkArgument(currentRow.getRowKind() == 
RowKind.INSERT);
                if (generateUpdateBefore) {
-                       // state stores complete row if generateUpdateBefore is 
true
+                       // state is not null when generateUpdateBefore is 
enabled,
+                       // the state stores complete row
                        BaseRow preRow = state.value();
                        state.update(currentRow);
                        if (preRow != null) {
-                               preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+                               preRow.setRowKind(RowKind.UPDATE_BEFORE);
                                out.collect(preRow);
                        }
                }
+               // in order for better performance, we don't have state for 
LastRow
+               // if not generate UPDATE_BEFORE, thus, we can't produce INSERT 
messages for first row.

Review comment:
       If the downstream is a database, Sink will execute a INSERT or a UPDATE 
statement for a row based on RowKind. If there is no INSERT message, how to 
insert the first row ?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
##########
@@ -182,14 +188,16 @@ private void processElement(
                        boolean inputIsLeft) throws Exception {
                boolean inputIsOuter = inputIsLeft ? leftIsOuter : rightIsOuter;
                boolean otherIsOuter = inputIsLeft ? rightIsOuter : leftIsOuter;
+               RowKind inputRowKind = input.getRowKind();
+               input.setRowKind(RowKind.INSERT); // erase RowKind for later 
state updating
 
                AssociatedRecords associatedRecords = 
AssociatedRecords.of(input, inputIsLeft, otherSideStateView, joinCondition);
-               if (BaseRowUtil.isAccumulateMsg(input)) { // record is 
accumulate
+               if (inputRowKind == RowKind.INSERT || inputRowKind == 
RowKind.UPDATE_AFTER) { // record is accumulate

Review comment:
       boolean isAccumulateMsg = BaseRowUtil.isAccumulateMsg(input);
   ...
   if (isAccumulateMsg) ...

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
##########
@@ -27,7 +27,7 @@
  */
 public abstract class ObjectArrayRow implements BaseRow {
 
-       private byte header;
+       private RowKind rowKind = RowKind.INSERT; // INSERT as default

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -153,7 +153,6 @@ public void open() throws Exception {
                        TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, 
this);
                collector = new TimestampedCollector<>(output);
                outRow = new JoinedRow();
-               outRow.setHeader(BaseRowUtil.ACCUMULATE_MSG);

Review comment:
       expicity call `outRow.setRowKind(RowKind.INSERT)`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
##########
@@ -75,57 +75,57 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
     // register cleanup timer with 3001
     testHarness.setProcessingTime(1)
 
-    // accumulate
-    testHarness.processElement(new StreamRecord(binaryrow("aaa", 1L: JLong), 
1))
-    expectedOutput.add(new StreamRecord(binaryrow("aaa", 1L: JLong), 1))
+    // insertion
+    testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong))

Review comment:
       is `timestamp` unnecessary ?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java
##########
@@ -147,8 +154,11 @@ public void processElement1(StreamRecord<BaseRow> element) 
throws Exception {
        @Override
        public void processElement2(StreamRecord<BaseRow> element) throws 
Exception {
                BaseRow input = element.getValue();
+               RowKind inputRowKind = input.getRowKind();
+               input.setRowKind(RowKind.INSERT); // erase RowKind for later 
state updating
+
                AssociatedRecords associatedRecords = 
AssociatedRecords.of(input, false, leftRecordStateView, joinCondition);
-               if (BaseRowUtil.isAccumulateMsg(input)) {
+               if (inputRowKind == RowKind.INSERT || inputRowKind == 
RowKind.UPDATE_AFTER) { // record is accumulate

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java
##########
@@ -156,13 +156,13 @@ public WindowOperatorBuilder withAllowedLateness(Duration 
allowedLateness) {
                if (allowedLateness.toMillis() > 0) {
                        this.allowedLateness = allowedLateness.toMillis();
                        // allow late element, which means this window will 
send retractions
-                       this.sendRetraction = true;
+                       this.produceUpdates = true;
                }
                return this;
        }
 
-       public WindowOperatorBuilder withSendRetraction() {
-               this.sendRetraction = true;
+       public WindowOperatorBuilder withProduceUpdates() {

Review comment:
       how about renaming to `produceUpdates` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to