Sorry for the typo... I mean it will not take too much time.
Best, Shengkai Shengkai Fang <fskm...@gmail.com> 于2021年3月9日周二 上午10:25写道: > Hi, Yuval. > > I have opened a ticket about this[1]. But I don't think we have any > solution to solve. > > Do you have time to help us to solve this? I think it will take too much > time. > > [1] https://issues.apache.org/jira/browse/FLINK-21675 > > Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 下午9:18写道: > >> Thank you Shenkai, >> That does explain what I'm seeing. >> >> Jark / Shenkai - Is there any workaround to get Flink to work with push >> watermarks and predicate pushdown until this is resolved? >> >> On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <fskm...@gmail.com> wrote: >> >>> Hi, Yuval, Jark, Timo. >>> >>> Currently the watermark push down happens in the logical rewrite phase >>> but the filter push down happens in the local phase, which means the >>> planner will first check the Filter push down and then check the watermark >>> push down. >>> >>> I think we need a rule to transpose between the filter and watermark >>> assigner or extend the filter push down rule to capture the structure that >>> the watermark assigner is the parent of the table scan. >>> >>> Best, >>> Shengkai >>> >>> Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 上午12:13写道: >>> >>>> Hi Jark, >>>> >>>> Even after implementing both, I don't see the watermark being pushed to >>>> the tablesource in the logical plan and avoids predicate pushdown from >>>> running. >>>> >>>> On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote: >>>> >>>>> Hi Yuval, >>>>> >>>>> That's correct you will always get a LogicalWatermarkAssigner if you >>>>> assigned a watermark. >>>>> If you implement SupportsWatermarkPushdown, >>>>> the LogicalWatermarkAssigner will be pushed >>>>> into TableSource, and then you can push Filter into source if source >>>>> implement SupportsFilterPushdown. >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Timo, >>>>>> After investigating this further, this is actually non related to >>>>>> implementing SupportsWatermarkPushdown. >>>>>> >>>>>> Once I create a TableSchema for my custom source's RowData, and >>>>>> assign it a watermark (see my example in the original mail), the plan >>>>>> will >>>>>> always include a LogicalWatermarkAssigner. This assigner that is between >>>>>> the LogicalTableScan and the LogicalFilter will then go on and fail the >>>>>> HepPlanner from invoking the optimization since it requires >>>>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have >>>>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't >>>>>> work. >>>>>> >>>>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Yuval, >>>>>>> >>>>>>> sorry that nobody replied earlier. Somehow your email fell through >>>>>>> the >>>>>>> cracks. >>>>>>> >>>>>>> If I understand you correctly, could would like to implement a table >>>>>>> source that implements both `SupportsWatermarkPushDown` and >>>>>>> `SupportsFilterPushDown`? >>>>>>> >>>>>>> The current behavior might be on purpose. Filters and Watermarks are >>>>>>> not >>>>>>> very compatible. Filtering would also mean that records (from which >>>>>>> watermarks could be generated) are skipped. If the filter is very >>>>>>> strict, we would not generate any new watermarks and the pipeline >>>>>>> would >>>>>>> stop making progress in time. >>>>>>> >>>>>>> Watermark push down is only necessary, if per-partition watermarks >>>>>>> are >>>>>>> required. Otherwise the watermarks are generated in a subsequent >>>>>>> operator after the source. So you can still use rowtime without >>>>>>> implementing `SupportsWatermarkPushDown` in your custom source. >>>>>>> >>>>>>> I will lookp in Shengkai who worked on this topic recently. >>>>>>> >>>>>>> Regards, >>>>>>> Timo >>>>>>> >>>>>>> >>>>>>> On 04.03.21 18:52, Yuval Itzchakov wrote: >>>>>>> > Bumping this up again, would appreciate any help if anyone is >>>>>>> familiar >>>>>>> > with the blink planner. >>>>>>> > >>>>>>> > Thanks, >>>>>>> > Yuval. >>>>>>> > >>>>>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com >>>>>>> > <mailto:yuva...@gmail.com>> wrote: >>>>>>> > >>>>>>> > Hi Jark, >>>>>>> > Would appreciate your help with this. >>>>>>> > >>>>>>> > On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan < >>>>>>> ro...@apache.org >>>>>>> > <mailto:ro...@apache.org>> wrote: >>>>>>> > >>>>>>> > Hi Yuval, >>>>>>> > >>>>>>> > I'm not familiar with the Blink planner but probably Jark >>>>>>> can help. >>>>>>> > >>>>>>> > Regards, >>>>>>> > Roman >>>>>>> > >>>>>>> > >>>>>>> > On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov >>>>>>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>>>>>> > >>>>>>> > Update: When I don't set the watermark explicitly on >>>>>>> the >>>>>>> > TableSchema, `applyWatermarkStrategy` never gets >>>>>>> called on >>>>>>> > my ScanTableSource, which does make sense. But now the >>>>>>> > question is what should be done? This feels a bit >>>>>>> unintuitive. >>>>>>> > >>>>>>> > On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov >>>>>>> > <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote: >>>>>>> > >>>>>>> > Hi, >>>>>>> > Flink 1.12.1, Blink Planner, Scala 2.12 >>>>>>> > >>>>>>> > I have the following logical plan: >>>>>>> > >>>>>>> > >>>>>>> LogicalSink(table=[default_catalog.default_database.table], >>>>>>> fields=[bar, >>>>>>> baz, hello_world, a, b]) >>>>>>> > +- LogicalProject(value=[$2], >>>>>>> > bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>>>>>> > baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)], >>>>>>> > hello_world=[null:VARCHAR(2147483647) CHARACTER SET >>>>>>> > "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER >>>>>>> SET >>>>>>> > "UTF-16LE"], b=[EMPTY_MAP()]) >>>>>>> > +- LogicalFilter(condition=[AND(=($4, >>>>>>> > _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))]) >>>>>>> > +- LogicalWatermarkAssigner(rowtime=[bar], >>>>>>> > watermark=[$0]) >>>>>>> > +- >>>>>>> LogicalTableScan(table=[[default_catalog, >>>>>>> > default_database, foo]]) >>>>>>> > >>>>>>> > I have a custom source which creates a TableSchema >>>>>>> based >>>>>>> > on an external table. When I create the schema, I >>>>>>> push >>>>>>> > the watermark definition to the schema: >>>>>>> > >>>>>>> > image.png >>>>>>> > >>>>>>> > When the HepPlanner starts the optimization phase >>>>>>> and >>>>>>> > reaches the "PushFilterInotTableSourceScanRule", it >>>>>>> > matches on the LogicalFilter in the definition. But >>>>>>> > then, since the RelOptRuleOperandChildPolicy is >>>>>>> set to >>>>>>> > "SOME", it attempts to do a full match on the child >>>>>>> > nodes. Since the rule is defined as so: >>>>>>> > >>>>>>> > image.png >>>>>>> > >>>>>>> > The child filter fails since the immediate child >>>>>>> of the >>>>>>> > filter is a "LocalWatermarkAssigner", and not the >>>>>>> > "LogicalTableScan" which is the grandchild: >>>>>>> > >>>>>>> > image.png >>>>>>> > >>>>>>> > Is this the desired behavior? Should I create the >>>>>>> > TableSchema without the row time attribute and use >>>>>>> > "SupportsWatermarkPushdown" to generate the >>>>>>> watermark >>>>>>> > dynamically from the source record? >>>>>>> > >>>>>>> > -- >>>>>>> > Best Regards, >>>>>>> > Yuval Itzchakov. >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > Best Regards, >>>>>>> > Yuval Itzchakov. >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > Best Regards, >>>>>>> > Yuval Itzchakov. >>>>>>> > >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Yuval Itzchakov. >>>>>> >>>>> >> >> -- >> Best Regards, >> Yuval Itzchakov. >> >