Dawid Wysakowicz created FLINK-37390:
----------------------------------------

             Summary: Push filter into ChangelogNormalize
                 Key: FLINK-37390
                 URL: https://issues.apache.org/jira/browse/FLINK-37390
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Dawid Wysakowicz
            Assignee: Dawid Wysakowicz
             Fix For: 2.1.0


We can optimize pipelines with ChangelogNormalize present if we push filters 
into the op
erator itself. That way we can

# reduce the number of records emitted downstream that will be filtered out 
right after the ChangelogNormalize
# drop state for keys that currently do not match the filter

Example:

```
// filter
value < 10

// input records
+U[key=A, value=1]
+U[key=A, value=2]
+U[key=A, value=12]
+U[key=A, value=13]
+U[key=A, value=15]
-D[key=A, value=15]

// records emitted without filter pushed into ChangelogNormalize
+I[key=A, value=1]
-U[key=A, value=1]
+U[key=A, value=2]
-U[key=A, value=2]
+U[key=A, value=12] // filtered out in the next operator
-U[key=A, value=12] // filtered out in the next operator
+U[key=A, value=13] // filtered out in the next operator
-U[key=A, value=13] // filtered out in the next operator
+U[key=A, value=15] // filtered out in the next operator
-D[key=A, value=15] // filtered out in the next operator

// records emitted with filter pushed into ChangelogNormalize
+I[key=A, value=1]
-U[key=A, value=1]
+U[key=A, value=2]
-D[key=A, value=2] // we clear the state for key=A
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to