Hi, Yuval.

I have opened a ticket about this[1]. But I don't think we have any
solution to solve.

Do you have time to help us to solve this? I think it will take too much
time.

[1] https://issues.apache.org/jira/browse/FLINK-21675

Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 下午9:18写道:

> Thank you Shenkai,
> That does explain what I'm seeing.
>
> Jark / Shenkai - Is there any workaround to get Flink to work with push
> watermarks and predicate pushdown until this is resolved?
>
> On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <fskm...@gmail.com> wrote:
>
>> Hi, Yuval, Jark, Timo.
>>
>> Currently the watermark push down happens in the logical rewrite phase
>> but the filter push down happens in the local phase, which means the
>> planner will first check the Filter push down and then check the watermark
>> push down.
>>
>> I think we need a rule to transpose between the filter and watermark
>> assigner or extend the filter push down rule to capture the structure that
>> the watermark assigner is the parent of the table scan.
>>
>> Best,
>> Shengkai
>>
>> Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 上午12:13写道:
>>
>>> Hi Jark,
>>>
>>> Even after implementing both, I don't see the watermark being pushed to
>>> the tablesource in the logical plan and avoids predicate pushdown from
>>> running.
>>>
>>> On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Hi Yuval,
>>>>
>>>> That's correct you will always get a LogicalWatermarkAssigner if you
>>>> assigned a watermark.
>>>> If you implement SupportsWatermarkPushdown,
>>>> the LogicalWatermarkAssigner will be pushed
>>>> into TableSource, and then you can push Filter into source if source
>>>> implement SupportsFilterPushdown.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com> wrote:
>>>>
>>>>> Hi Timo,
>>>>> After investigating this further, this is actually non related to
>>>>> implementing SupportsWatermarkPushdown.
>>>>>
>>>>> Once I create a TableSchema for my custom source's RowData, and assign
>>>>> it a watermark (see my example in the original mail), the plan will always
>>>>> include a LogicalWatermarkAssigner. This assigner that is between the
>>>>> LogicalTableScan and the LogicalFilter will then go on and fail the
>>>>> HepPlanner from invoking the optimization since it requires
>>>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>>>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>>>>> work.
>>>>>
>>>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Yuval,
>>>>>>
>>>>>> sorry that nobody replied earlier. Somehow your email fell through
>>>>>> the
>>>>>> cracks.
>>>>>>
>>>>>> If I understand you correctly, could would like to implement a table
>>>>>> source that implements both `SupportsWatermarkPushDown` and
>>>>>> `SupportsFilterPushDown`?
>>>>>>
>>>>>> The current behavior might be on purpose. Filters and Watermarks are
>>>>>> not
>>>>>> very compatible. Filtering would also mean that records (from which
>>>>>> watermarks could be generated) are skipped. If the filter is very
>>>>>> strict, we would not generate any new watermarks and the pipeline
>>>>>> would
>>>>>> stop making progress in time.
>>>>>>
>>>>>> Watermark push down is only necessary, if per-partition watermarks
>>>>>> are
>>>>>> required. Otherwise the watermarks are generated in a subsequent
>>>>>> operator after the source. So you can still use rowtime without
>>>>>> implementing `SupportsWatermarkPushDown` in your custom source.
>>>>>>
>>>>>> I will lookp in Shengkai who worked on this topic recently.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>>>>>> > Bumping this up again, would appreciate any help if anyone is
>>>>>> familiar
>>>>>> > with the blink planner.
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Yuval.
>>>>>> >
>>>>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com
>>>>>> > <mailto:yuva...@gmail.com>> wrote:
>>>>>> >
>>>>>> >     Hi Jark,
>>>>>> >     Would appreciate your help with this.
>>>>>> >
>>>>>> >     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>>>>>> ro...@apache.org
>>>>>> >     <mailto:ro...@apache.org>> wrote:
>>>>>> >
>>>>>> >         Hi Yuval,
>>>>>> >
>>>>>> >         I'm not familiar with the Blink planner but probably Jark
>>>>>> can help.
>>>>>> >
>>>>>> >         Regards,
>>>>>> >         Roman
>>>>>> >
>>>>>> >
>>>>>> >         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>>>>>> >         <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>>>> >
>>>>>> >             Update: When I don't set the watermark explicitly on the
>>>>>> >             TableSchema, `applyWatermarkStrategy` never gets called
>>>>>> on
>>>>>> >             my ScanTableSource, which does make sense. But now the
>>>>>> >             question is what should be done? This feels a bit
>>>>>> unintuitive.
>>>>>> >
>>>>>> >             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>>>>>> >             <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>>>> >
>>>>>> >                 Hi,
>>>>>> >                 Flink 1.12.1, Blink Planner, Scala 2.12
>>>>>> >
>>>>>> >                 I have the following logical plan:
>>>>>> >
>>>>>> >
>>>>>>  LogicalSink(table=[default_catalog.default_database.table], fields=[bar,
>>>>>> baz, hello_world, a, b])
>>>>>> >                 +- LogicalProject(value=[$2],
>>>>>> >                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>>>> >                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>>>> >                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>>>>>> >                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER
>>>>>> SET
>>>>>> >                 "UTF-16LE"], b=[EMPTY_MAP()])
>>>>>> >                     +- LogicalFilter(condition=[AND(=($4,
>>>>>> >                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>>>>>> >                        +- LogicalWatermarkAssigner(rowtime=[bar],
>>>>>> >                 watermark=[$0])
>>>>>> >                           +-
>>>>>> LogicalTableScan(table=[[default_catalog,
>>>>>> >                 default_database, foo]])
>>>>>> >
>>>>>> >                 I have a custom source which creates a TableSchema
>>>>>> based
>>>>>> >                 on an external table. When I create the schema, I
>>>>>> push
>>>>>> >                 the watermark definition to the schema:
>>>>>> >
>>>>>> >                 image.png
>>>>>> >
>>>>>> >                 When the HepPlanner starts the optimization phase
>>>>>> and
>>>>>> >                 reaches the "PushFilterInotTableSourceScanRule", it
>>>>>> >                 matches on the LogicalFilter in the definition. But
>>>>>> >                 then, since the RelOptRuleOperandChildPolicy is set
>>>>>> to
>>>>>> >                 "SOME", it attempts to do a full match on the child
>>>>>> >                 nodes. Since the rule is defined as so:
>>>>>> >
>>>>>> >                 image.png
>>>>>> >
>>>>>> >                 The child filter fails since the immediate child of
>>>>>> the
>>>>>> >                 filter is a "LocalWatermarkAssigner", and not the
>>>>>> >                 "LogicalTableScan" which is the grandchild:
>>>>>> >
>>>>>> >                 image.png
>>>>>> >
>>>>>> >                 Is this the desired behavior? Should I create the
>>>>>> >                 TableSchema without the row time attribute and use
>>>>>> >                 "SupportsWatermarkPushdown" to generate the
>>>>>> watermark
>>>>>> >                 dynamically from the source record?
>>>>>> >
>>>>>> >                 --
>>>>>> >                 Best Regards,
>>>>>> >                 Yuval Itzchakov.
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >             --
>>>>>> >             Best Regards,
>>>>>> >             Yuval Itzchakov.
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >     --
>>>>>> >     Best Regards,
>>>>>> >     Yuval Itzchakov.
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov.
>>>>>
>>>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Reply via email to