Dawid Wysakowicz created FLINK-37390:
----------------------------------------
Summary: Push filter into ChangelogNormalize
Key: FLINK-37390
URL: https://issues.apache.org/jira/browse/FLINK-37390
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
Fix For: 2.1.0
We can optimize pipelines with ChangelogNormalize present if we push filters
into the op
erator itself. That way we can
# reduce the number of records emitted downstream that will be filtered out
right after the ChangelogNormalize
# drop state for keys that currently do not match the filter
Example:
```
// filter
value < 10
// input records
+U[key=A, value=1]
+U[key=A, value=2]
+U[key=A, value=12]
+U[key=A, value=13]
+U[key=A, value=15]
-D[key=A, value=15]
// records emitted without filter pushed into ChangelogNormalize
+I[key=A, value=1]
-U[key=A, value=1]
+U[key=A, value=2]
-U[key=A, value=2]
+U[key=A, value=12] // filtered out in the next operator
-U[key=A, value=12] // filtered out in the next operator
+U[key=A, value=13] // filtered out in the next operator
-U[key=A, value=13] // filtered out in the next operator
+U[key=A, value=15] // filtered out in the next operator
-D[key=A, value=15] // filtered out in the next operator
// records emitted with filter pushed into ChangelogNormalize
+I[key=A, value=1]
-U[key=A, value=1]
+U[key=A, value=2]
-D[key=A, value=2] // we clear the state for key=A
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)