twalthr commented on code in PR #26880:
URL: https://github.com/apache/flink/pull/26880#discussion_r2260702156


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java:
##########
@@ -99,12 +99,14 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     private final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap;
 
-    @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS)
+    @JsonProperty(FIELD_NAME_INPUT_UNIQUE_KEYS)
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
-    // List of upsert keys for each input, where each inner list corresponds 
to an input
-    // The reason it's a List<List<int[]>> is that SQL allows only one primary 
key but
-    // multiple upsert (unique) keys per input
-    private final List<List<int[]>> inputUpsertKeys;
+

Review Comment:
   nit: remove empty line and put the annotations over the code, the comment 
can be on top



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java:
##########
@@ -194,26 +194,27 @@ private RexNode createMultiJoinCondition() {
         return RexUtil.composeConjunction(getCluster().getRexBuilder(), 
conjunctions, true);
     }
 
-    private List<List<int[]>> getUpsertKeysForInputs() {
+    private List<List<int[]>> getUniqueKeysForInputs() {
         return inputs.stream()
                 .map(
                         input -> {
-                            final Set<ImmutableBitSet> upsertKeys = 
getUpsertKeys(input);
+                            final Set<ImmutableBitSet> uniqueKeys = 
getUniqueKeys(input);
 
-                            if (upsertKeys == null) {
+                            if (uniqueKeys == null) {
                                 return Collections.<int[]>emptyList();
                             }
-                            return upsertKeys.stream()
+
+                            return uniqueKeys.stream()
                                     .map(ImmutableBitSet::toArray)
                                     .collect(Collectors.toList());
                         })
                 .collect(Collectors.toList());
     }
 
-    private @Nullable Set<ImmutableBitSet> getUpsertKeys(RelNode input) {
+    private @Nullable Set<ImmutableBitSet> getUniqueKeys(RelNode input) {
         final FlinkRelMetadataQuery fmq =
                 
FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery());
-        return fmq.getUpsertKeys(input);
+        return fmq.getUniqueKeys(input);

Review Comment:
   Could we add one final test that uses 1 append table with primary key, 1 
retract table without primary key, and one upsert table with primary key? Just 
as a final validation that unique keys and changelog mode nicely fit together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to