Leonard Xu created FLINK-19878: ---------------------------------- Summary: Improve watermark ChangelogNormalize for upsertSource Key: FLINK-19878 URL: https://issues.apache.org/jira/browse/FLINK-19878 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Leonard Xu
Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is followed after ChangelogNormalize Node, it may returns Long.MaxValue as watermark if some parallelism doesn't have data. As an improvement, we can move the WatermarkAssigner to be after the SourceCan Node and thus the watermark will produce like general Source. {code:java} +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D]) +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D]) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)