Hi, Yuval, Jark, Timo.

Currently the watermark push down happens in the logical rewrite phase but
the filter push down happens in the local phase, which means the planner
will first check the Filter push down and then check the watermark push
down.

I think we need a rule to transpose between the filter and watermark
assigner or extend the filter push down rule to capture the structure that
the watermark assigner is the parent of the table scan.

Best,
Shengkai

Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 上午12:13写道:

> Hi Jark,
>
> Even after implementing both, I don't see the watermark being pushed to
> the tablesource in the logical plan and avoids predicate pushdown from
> running.
>
> On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Yuval,
>>
>> That's correct you will always get a LogicalWatermarkAssigner if you
>> assigned a watermark.
>> If you implement SupportsWatermarkPushdown, the LogicalWatermarkAssigner
>> will be pushed
>> into TableSource, and then you can push Filter into source if source
>> implement SupportsFilterPushdown.
>>
>> Best,
>> Jark
>>
>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote:
>>
>>> Hi Timo,
>>> After investigating this further, this is actually non related to
>>> implementing SupportsWatermarkPushdown.
>>>
>>> Once I create a TableSchema for my custom source's RowData, and assign
>>> it a watermark (see my example in the original mail), the plan will always
>>> include a LogicalWatermarkAssigner. This assigner that is between the
>>> LogicalTableScan and the LogicalFilter will then go on and fail the
>>> HepPlanner from invoking the optimization since it requires
>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>>> work.
>>>
>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Yuval,
>>>>
>>>> sorry that nobody replied earlier. Somehow your email fell through the
>>>> cracks.
>>>>
>>>> If I understand you correctly, could would like to implement a table
>>>> source that implements both `SupportsWatermarkPushDown` and
>>>> `SupportsFilterPushDown`?
>>>>
>>>> The current behavior might be on purpose. Filters and Watermarks are
>>>> not
>>>> very compatible. Filtering would also mean that records (from which
>>>> watermarks could be generated) are skipped. If the filter is very
>>>> strict, we would not generate any new watermarks and the pipeline would
>>>> stop making progress in time.
>>>>
>>>> Watermark push down is only necessary, if per-partition watermarks are
>>>> required. Otherwise the watermarks are generated in a subsequent
>>>> operator after the source. So you can still use rowtime without
>>>> implementing `SupportsWatermarkPushDown` in your custom source.
>>>>
>>>> I will lookp in Shengkai who worked on this topic recently.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>>>> > Bumping this up again, would appreciate any help if anyone is
>>>> familiar
>>>> > with the blink planner.
>>>> >
>>>> > Thanks,
>>>> > Yuval.
>>>> >
>>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com
>>>> > <mailto:yuva...@gmail.com>> wrote:
>>>> >
>>>> >     Hi Jark,
>>>> >     Would appreciate your help with this.
>>>> >
>>>> >     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>>>> ro...@apache.org
>>>> >     <mailto:ro...@apache.org>> wrote:
>>>> >
>>>> >         Hi Yuval,
>>>> >
>>>> >         I'm not familiar with the Blink planner but probably Jark can
>>>> help.
>>>> >
>>>> >         Regards,
>>>> >         Roman
>>>> >
>>>> >
>>>> >         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>>>> >         <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>> >
>>>> >             Update: When I don't set the watermark explicitly on the
>>>> >             TableSchema, `applyWatermarkStrategy` never gets called on
>>>> >             my ScanTableSource, which does make sense. But now the
>>>> >             question is what should be done? This feels a bit
>>>> unintuitive.
>>>> >
>>>> >             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>>>> >             <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>> >
>>>> >                 Hi,
>>>> >                 Flink 1.12.1, Blink Planner, Scala 2.12
>>>> >
>>>> >                 I have the following logical plan:
>>>> >
>>>> >
>>>>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
>>>> baz, hello_world, a, b])
>>>> >                 +- LogicalProject(value=[$2],
>>>> >                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>> >                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>> >                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>>>> >                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
>>>> >                 "UTF-16LE"], b=[EMPTY_MAP()])
>>>> >                     +- LogicalFilter(condition=[AND(=($4,
>>>> >                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>>>> >                        +- LogicalWatermarkAssigner(rowtime=[bar],
>>>> >                 watermark=[$0])
>>>> >                           +- LogicalTableScan(table=[[default_catalog,
>>>> >                 default_database, foo]])
>>>> >
>>>> >                 I have a custom source which creates a TableSchema
>>>> based
>>>> >                 on an external table. When I create the schema, I push
>>>> >                 the watermark definition to the schema:
>>>> >
>>>> >                 image.png
>>>> >
>>>> >                 When the HepPlanner starts the optimization phase and
>>>> >                 reaches the "PushFilterInotTableSourceScanRule", it
>>>> >                 matches on the LogicalFilter in the definition. But
>>>> >                 then, since the RelOptRuleOperandChildPolicy is set to
>>>> >                 "SOME", it attempts to do a full match on the child
>>>> >                 nodes. Since the rule is defined as so:
>>>> >
>>>> >                 image.png
>>>> >
>>>> >                 The child filter fails since the immediate child of
>>>> the
>>>> >                 filter is a "LocalWatermarkAssigner", and not the
>>>> >                 "LogicalTableScan" which is the grandchild:
>>>> >
>>>> >                 image.png
>>>> >
>>>> >                 Is this the desired behavior? Should I create the
>>>> >                 TableSchema without the row time attribute and use
>>>> >                 "SupportsWatermarkPushdown" to generate the watermark
>>>> >                 dynamically from the source record?
>>>> >
>>>> >                 --
>>>> >                 Best Regards,
>>>> >                 Yuval Itzchakov.
>>>> >
>>>> >
>>>> >
>>>> >             --
>>>> >             Best Regards,
>>>> >             Yuval Itzchakov.
>>>> >
>>>> >
>>>> >
>>>> >     --
>>>> >     Best Regards,
>>>> >     Yuval Itzchakov.
>>>> >
>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>

Reply via email to