Hi Yuval, That's correct you will always get a LogicalWatermarkAssigner if you assigned a watermark. If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner will be pushed into TableSource, and then you can push Filter into source if source implement SupportsFilterPushdown.
Best, Jark On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote: > Hi Timo, > After investigating this further, this is actually non related to > implementing SupportsWatermarkPushdown. > > Once I create a TableSchema for my custom source's RowData, and assign it > a watermark (see my example in the original mail), the plan will always > include a LogicalWatermarkAssigner. This assigner that is between the > LogicalTableScan and the LogicalFilter will then go on and fail the > HepPlanner from invoking the optimization since it requires > LogicalTableScan to be a direct child of LogicalFilter. Since I have > LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't > work. > > On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote: > >> Hi Yuval, >> >> sorry that nobody replied earlier. Somehow your email fell through the >> cracks. >> >> If I understand you correctly, could would like to implement a table >> source that implements both `SupportsWatermarkPushDown` and >> `SupportsFilterPushDown`? >> >> The current behavior might be on purpose. Filters and Watermarks are not >> very compatible. Filtering would also mean that records (from which >> watermarks could be generated) are skipped. If the filter is very >> strict, we would not generate any new watermarks and the pipeline would >> stop making progress in time. >> >> Watermark push down is only necessary, if per-partition watermarks are >> required. Otherwise the watermarks are generated in a subsequent >> operator after the source. So you can still use rowtime without >> implementing `SupportsWatermarkPushDown` in your custom source. >> >> I will lookp in Shengkai who worked on this topic recently. >> >> Regards, >> Timo >> >> >> On 04.03.21 18:52, Yuval Itzchakov wrote: >> > Bumping this up again, would appreciate any help if anyone is familiar >> > with the blink planner. >> > >> > Thanks, >> > Yuval. >> > >> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com >> > <mailto:yuva...@gmail.com>> wrote: >> > >> > Hi Jark, >> > Would appreciate your help with this. >> > >> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan < >> ro...@apache.org >> > <mailto:ro...@apache.org>> wrote: >> > >> > Hi Yuval, >> > >> > I'm not familiar with the Blink planner but probably Jark can >> help. >> > >> > Regards, >> > Roman >> > >> > >> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov >> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >> > >> > Update: When I don't set the watermark explicitly on the >> > TableSchema, `applyWatermarkStrategy` never gets called on >> > my ScanTableSource, which does make sense. But now the >> > question is what should be done? This feels a bit >> unintuitive. >> > >> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov >> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >> > >> > Hi, >> > Flink 1.12.1, Blink Planner, Scala 2.12 >> > >> > I have the following logical plan: >> > >> > >> LogicalSink(table=[default_catalog.default_database.table], fields=[bar, >> baz, hello_world, a, b]) >> > +- LogicalProject(value=[$2], >> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET >> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET >> > "UTF-16LE"], b=[EMPTY_MAP()]) >> > +- LogicalFilter(condition=[AND(=($4, >> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))]) >> > +- LogicalWatermarkAssigner(rowtime=[bar], >> > watermark=[$0]) >> > +- LogicalTableScan(table=[[default_catalog, >> > default_database, foo]]) >> > >> > I have a custom source which creates a TableSchema based >> > on an external table. When I create the schema, I push >> > the watermark definition to the schema: >> > >> > image.png >> > >> > When the HepPlanner starts the optimization phase and >> > reaches the "PushFilterInotTableSourceScanRule", it >> > matches on the LogicalFilter in the definition. But >> > then, since the RelOptRuleOperandChildPolicy is set to >> > "SOME", it attempts to do a full match on the child >> > nodes. Since the rule is defined as so: >> > >> > image.png >> > >> > The child filter fails since the immediate child of the >> > filter is a "LocalWatermarkAssigner", and not the >> > "LogicalTableScan" which is the grandchild: >> > >> > image.png >> > >> > Is this the desired behavior? Should I create the >> > TableSchema without the row time attribute and use >> > "SupportsWatermarkPushdown" to generate the watermark >> > dynamically from the source record? >> > >> > -- >> > Best Regards, >> > Yuval Itzchakov. >> > >> > >> > >> > -- >> > Best Regards, >> > Yuval Itzchakov. >> > >> > >> > >> > -- >> > Best Regards, >> > Yuval Itzchakov. >> > >> >> > > -- > Best Regards, > Yuval Itzchakov. >