gustavodemorais commented on code in PR #26591:
URL: https://github.com/apache/flink/pull/26591#discussion_r2106329686


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java:
##########
@@ -199,4 +200,87 @@ public class ChangelogNormalizeTestPrograms {
                                     .build())
                     .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 10")
                     .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_FILTER_ON_WATERMARK =
+            TableTestProgram.of(
+                            "changelog-normalize-upsert-filter-watermark",
+                            "validates changelog normalize upsert with filter 
using current_watermark")
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .addSchema(
+                                            "a VARCHAR",
+                                            "b INT NOT NULL",
+                                            "c VARCHAR",
+                                            "d TIMESTAMP_LTZ(3)",
+                                            "WATERMARK FOR d AS d",
+                                            "PRIMARY KEY(a) NOT ENFORCED")
+                                    .producedBeforeRestore(
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "one",
+                                                    1,
+                                                    "a",
+                                                    Instant.ofEpochMilli(1L)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "one",
+                                                    2,
+                                                    "b",
+                                                    Instant.ofEpochMilli(1L)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "one",
+                                                    12,
+                                                    "b",
+                                                    Instant.ofEpochMilli(1L)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "one",
+                                                    13,
+                                                    "b",
+                                                    Instant.ofEpochMilli(1L)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "three",
+                                                    3,
+                                                    "cc",
+                                                    Instant.ofEpochMilli(1L)))
+                                    .producedAfterRestore(
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "one",
+                                                    15,
+                                                    "aa",
+                                                    Instant.ofEpochMilli(1L)),
+                                            Row.ofKind(
+                                                    RowKind.DELETE,
+                                                    "one",
+                                                    15,
+                                                    "c",
+                                                    Instant.ofEpochMilli(1L)),
+                                            Row.ofKind(
+                                                    RowKind.DELETE,
+                                                    "three",
+                                                    3,
+                                                    "cc",
+                                                    Instant.ofEpochMilli(1L)))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, b]",
+                                            "-D[one, 2, b]",
+                                            "+I[three, 3, cc]")
+                                    .consumedAfterRestore("-D[three, 3, cc]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 10 AND "
+                                    + "CURRENT_WATERMARK(d) IS NULL")

Review Comment:
   It's always null because we don't progress the watermark, correct?
   



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/FilterCondition.java:
##########
@@ -27,5 +31,66 @@ public interface FilterCondition extends RichFunction {
     /**
      * @return true if the filter condition stays true for the input row
      */
-    boolean apply(RowData in);
+    boolean apply(Context ctx, RowData in);
+
+    /**
+     * Context for generating expressions such as e.g. {@code 
CURRENT_WATERMARK} or {@code
+     * STREAMRECORD_TIMESTAMP}.
+     *
+     * @see ProcessFunction.Context
+     */
+    interface Context {
+        /**
+         * Timestamp of the element currently being processed or timestamp of 
a firing timer.
+         *
+         * <p>This might be {@code null}, depending on the stream's watermark 
strategy.
+         */
+        Long timestamp();
+
+        /** A {@link TimerService} for querying time and registering timers. */
+        TimerService timerService();
+
+        static Context of(KeyedProcessFunction<?, ?, ?>.Context context) {
+            return new Context() {
+                @Override
+                public Long timestamp() {
+                    return context.timestamp();
+                }
+
+                @Override
+                public TimerService timerService() {
+                    return context.timerService();
+                }
+            };
+        }
+
+        static Context of(ProcessFunction<?, ?>.Context context) {
+            return new Context() {
+                @Override
+                public Long timestamp() {
+                    return context.timestamp();
+                }
+
+                @Override
+                public TimerService timerService() {
+                    return context.timerService();
+                }
+            };
+        }
+
+        Context INVALID_CONTEXT =

Review Comment:
   Nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to