Hi, Yuval. I have opened a ticket about this[1]. But I don't think we have any solution to solve.
Do you have time to help us to solve this? I think it will take too much time. [1] https://issues.apache.org/jira/browse/FLINK-21675 Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 下午9:18写道: > Thank you Shenkai, > That does explain what I'm seeing. > > Jark / Shenkai - Is there any workaround to get Flink to work with push > watermarks and predicate pushdown until this is resolved? > > On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <fskm...@gmail.com> wrote: > >> Hi, Yuval, Jark, Timo. >> >> Currently the watermark push down happens in the logical rewrite phase >> but the filter push down happens in the local phase, which means the >> planner will first check the Filter push down and then check the watermark >> push down. >> >> I think we need a rule to transpose between the filter and watermark >> assigner or extend the filter push down rule to capture the structure that >> the watermark assigner is the parent of the table scan. >> >> Best, >> Shengkai >> >> Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 上午12:13写道: >> >>> Hi Jark, >>> >>> Even after implementing both, I don't see the watermark being pushed to >>> the tablesource in the logical plan and avoids predicate pushdown from >>> running. >>> >>> On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi Yuval, >>>> >>>> That's correct you will always get a LogicalWatermarkAssigner if you >>>> assigned a watermark. >>>> If you implement SupportsWatermarkPushdown, >>>> the LogicalWatermarkAssigner will be pushed >>>> into TableSource, and then you can push Filter into source if source >>>> implement SupportsFilterPushdown. >>>> >>>> Best, >>>> Jark >>>> >>>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote: >>>> >>>>> Hi Timo, >>>>> After investigating this further, this is actually non related to >>>>> implementing SupportsWatermarkPushdown. >>>>> >>>>> Once I create a TableSchema for my custom source's RowData, and assign >>>>> it a watermark (see my example in the original mail), the plan will always >>>>> include a LogicalWatermarkAssigner. This assigner that is between the >>>>> LogicalTableScan and the LogicalFilter will then go on and fail the >>>>> HepPlanner from invoking the optimization since it requires >>>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have >>>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't >>>>> work. >>>>> >>>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Yuval, >>>>>> >>>>>> sorry that nobody replied earlier. Somehow your email fell through >>>>>> the >>>>>> cracks. >>>>>> >>>>>> If I understand you correctly, could would like to implement a table >>>>>> source that implements both `SupportsWatermarkPushDown` and >>>>>> `SupportsFilterPushDown`? >>>>>> >>>>>> The current behavior might be on purpose. Filters and Watermarks are >>>>>> not >>>>>> very compatible. Filtering would also mean that records (from which >>>>>> watermarks could be generated) are skipped. If the filter is very >>>>>> strict, we would not generate any new watermarks and the pipeline >>>>>> would >>>>>> stop making progress in time. >>>>>> >>>>>> Watermark push down is only necessary, if per-partition watermarks >>>>>> are >>>>>> required. Otherwise the watermarks are generated in a subsequent >>>>>> operator after the source. So you can still use rowtime without >>>>>> implementing `SupportsWatermarkPushDown` in your custom source. >>>>>> >>>>>> I will lookp in Shengkai who worked on this topic recently. >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> >>>>>> On 04.03.21 18:52, Yuval Itzchakov wrote: >>>>>> > Bumping this up again, would appreciate any help if anyone is >>>>>> familiar >>>>>> > with the blink planner. >>>>>> > >>>>>> > Thanks, >>>>>> > Yuval. >>>>>> > >>>>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com >>>>>> > <mailto:yuva...@gmail.com>> wrote: >>>>>> > >>>>>> > Hi Jark, >>>>>> > Would appreciate your help with this. >>>>>> > >>>>>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan < >>>>>> ro...@apache.org >>>>>> > <mailto:ro...@apache.org>> wrote: >>>>>> > >>>>>> > Hi Yuval, >>>>>> > >>>>>> > I'm not familiar with the Blink planner but probably Jark >>>>>> can help. >>>>>> > >>>>>> > Regards, >>>>>> > Roman >>>>>> > >>>>>> > >>>>>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov >>>>>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>>>>> > >>>>>> > Update: When I don't set the watermark explicitly on the >>>>>> > TableSchema, `applyWatermarkStrategy` never gets called >>>>>> on >>>>>> > my ScanTableSource, which does make sense. But now the >>>>>> > question is what should be done? This feels a bit >>>>>> unintuitive. >>>>>> > >>>>>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov >>>>>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>>>>> > >>>>>> > Hi, >>>>>> > Flink 1.12.1, Blink Planner, Scala 2.12 >>>>>> > >>>>>> > I have the following logical plan: >>>>>> > >>>>>> > >>>>>> LogicalSink(table=[default_catalog.default_database.table], fields=[bar, >>>>>> baz, hello_world, a, b]) >>>>>> > +- LogicalProject(value=[$2], >>>>>> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>>>>> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>>>>> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET >>>>>> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER >>>>>> SET >>>>>> > "UTF-16LE"], b=[EMPTY_MAP()]) >>>>>> > +- LogicalFilter(condition=[AND(=($4, >>>>>> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))]) >>>>>> > +- LogicalWatermarkAssigner(rowtime=[bar], >>>>>> > watermark=[$0]) >>>>>> > +- >>>>>> LogicalTableScan(table=[[default_catalog, >>>>>> > default_database, foo]]) >>>>>> > >>>>>> > I have a custom source which creates a TableSchema >>>>>> based >>>>>> > on an external table. When I create the schema, I >>>>>> push >>>>>> > the watermark definition to the schema: >>>>>> > >>>>>> > image.png >>>>>> > >>>>>> > When the HepPlanner starts the optimization phase >>>>>> and >>>>>> > reaches the "PushFilterInotTableSourceScanRule", it >>>>>> > matches on the LogicalFilter in the definition. But >>>>>> > then, since the RelOptRuleOperandChildPolicy is set >>>>>> to >>>>>> > "SOME", it attempts to do a full match on the child >>>>>> > nodes. Since the rule is defined as so: >>>>>> > >>>>>> > image.png >>>>>> > >>>>>> > The child filter fails since the immediate child of >>>>>> the >>>>>> > filter is a "LocalWatermarkAssigner", and not the >>>>>> > "LogicalTableScan" which is the grandchild: >>>>>> > >>>>>> > image.png >>>>>> > >>>>>> > Is this the desired behavior? Should I create the >>>>>> > TableSchema without the row time attribute and use >>>>>> > "SupportsWatermarkPushdown" to generate the >>>>>> watermark >>>>>> > dynamically from the source record? >>>>>> > >>>>>> > -- >>>>>> > Best Regards, >>>>>> > Yuval Itzchakov. >>>>>> > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > Best Regards, >>>>>> > Yuval Itzchakov. >>>>>> > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > Best Regards, >>>>>> > Yuval Itzchakov. >>>>>> > >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Yuval Itzchakov. >>>>> >>>> > > -- > Best Regards, > Yuval Itzchakov. >