fhueske commented on code in PR #28164:
URL: https://github.com/apache/flink/pull/28164#discussion_r3260546159


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -95,6 +100,56 @@ public Optional<List<DataType>> inferInputTypes(
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
 
+    // 
--------------------------------------------------------------------------------------------
+    // Changelog mode inference
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Emits an upsert changelog when the input is partitioned (set semantics) 
and the resolved
+     * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code 
UPDATE_BEFORE}. In all other
+     * cases the output is a retract changelog. When upsert mode is selected, 
the partition key acts
+     * as the upsert key.
+     */
+    public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
+            ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) : 
ChangelogMode.all();
+
+    /**
+     * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert 
changelog: the input
+     * table is partitioned AND the resolved {@code op_mapping} contains 
{@code UPDATE_AFTER}
+     * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the 
mapping is absent or
+     * cannot be resolved as a literal. The default mapping maps to a retract 
table.
+     */
+    private static boolean isUpsertConfig(final ChangelogContext ctx) {
+        if (!isPartitioned(ctx::getTableSemantics)) {
+            return false;
+        }
+        return ctx.getArgumentValue(ARG_OP_MAPPING, Map.class).stream()
+                .anyMatch(FromChangelogTypeStrategy::describesUpsert);

Review Comment:
   nit
   ```suggestion
           return ctx.getArgumentValue(ARG_OP_MAPPING, 
Map.class).map(FromChangelogTypeStrategy::describesUpsert).orElse(false);
   ```
   `stream()` of an `Optional` surely works, but kinda hides that this is just 
a single element.
   `map()` is more clear, IMO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to