Hi Timo, After investigating this further, this is actually non related to implementing SupportsWatermarkPushdown.
Once I create a TableSchema for my custom source's RowData, and assign it a watermark (see my example in the original mail), the plan will always include a LogicalWatermarkAssigner. This assigner that is between the LogicalTableScan and the LogicalFilter will then go on and fail the HepPlanner from invoking the optimization since it requires LogicalTableScan to be a direct child of LogicalFilter. Since I have LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't work. On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote: > Hi Yuval, > > sorry that nobody replied earlier. Somehow your email fell through the > cracks. > > If I understand you correctly, could would like to implement a table > source that implements both `SupportsWatermarkPushDown` and > `SupportsFilterPushDown`? > > The current behavior might be on purpose. Filters and Watermarks are not > very compatible. Filtering would also mean that records (from which > watermarks could be generated) are skipped. If the filter is very > strict, we would not generate any new watermarks and the pipeline would > stop making progress in time. > > Watermark push down is only necessary, if per-partition watermarks are > required. Otherwise the watermarks are generated in a subsequent > operator after the source. So you can still use rowtime without > implementing `SupportsWatermarkPushDown` in your custom source. > > I will lookp in Shengkai who worked on this topic recently. > > Regards, > Timo > > > On 04.03.21 18:52, Yuval Itzchakov wrote: > > Bumping this up again, would appreciate any help if anyone is familiar > > with the blink planner. > > > > Thanks, > > Yuval. > > > > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com > > <mailto:yuva...@gmail.com>> wrote: > > > > Hi Jark, > > Would appreciate your help with this. > > > > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <ro...@apache.org > > <mailto:ro...@apache.org>> wrote: > > > > Hi Yuval, > > > > I'm not familiar with the Blink planner but probably Jark can > help. > > > > Regards, > > Roman > > > > > > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov > > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: > > > > Update: When I don't set the watermark explicitly on the > > TableSchema, `applyWatermarkStrategy` never gets called on > > my ScanTableSource, which does make sense. But now the > > question is what should be done? This feels a bit > unintuitive. > > > > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov > > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: > > > > Hi, > > Flink 1.12.1, Blink Planner, Scala 2.12 > > > > I have the following logical plan: > > > > > LogicalSink(table=[default_catalog.default_database.table], fields=[bar, > baz, hello_world, a, b]) > > +- LogicalProject(value=[$2], > > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], > > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], > > hello_world=[null:VARCHAR(2147483647) CHARACTER SET > > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET > > "UTF-16LE"], b=[EMPTY_MAP()]) > > +- LogicalFilter(condition=[AND(=($4, > > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))]) > > +- LogicalWatermarkAssigner(rowtime=[bar], > > watermark=[$0]) > > +- LogicalTableScan(table=[[default_catalog, > > default_database, foo]]) > > > > I have a custom source which creates a TableSchema based > > on an external table. When I create the schema, I push > > the watermark definition to the schema: > > > > image.png > > > > When the HepPlanner starts the optimization phase and > > reaches the "PushFilterInotTableSourceScanRule", it > > matches on the LogicalFilter in the definition. But > > then, since the RelOptRuleOperandChildPolicy is set to > > "SOME", it attempts to do a full match on the child > > nodes. Since the rule is defined as so: > > > > image.png > > > > The child filter fails since the immediate child of the > > filter is a "LocalWatermarkAssigner", and not the > > "LogicalTableScan" which is the grandchild: > > > > image.png > > > > Is this the desired behavior? Should I create the > > TableSchema without the row time attribute and use > > "SupportsWatermarkPushdown" to generate the watermark > > dynamically from the source record? > > > > -- > > Best Regards, > > Yuval Itzchakov. > > > > > > > > -- > > Best Regards, > > Yuval Itzchakov. > > > > > > > > -- > > Best Regards, > > Yuval Itzchakov. > > > > -- Best Regards, Yuval Itzchakov.