Sorry for the typo...

I mean it will not take too much time.

Best,
Shengkai

Shengkai Fang <fskm...@gmail.com> 于2021年3月9日周二 上午10:25写道:

> Hi, Yuval.
>
> I have opened a ticket about this[1]. But I don't think we have any
> solution to solve.
>
> Do you have time to help us to solve this? I think it will take too much
> time.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21675
>
> Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 下午9:18写道:
>
>> Thank you Shenkai,
>> That does explain what I'm seeing.
>>
>> Jark / Shenkai - Is there any workaround to get Flink to work with push
>> watermarks and predicate pushdown until this is resolved?
>>
>> On Mon, Mar 8, 2021 at 4:54 AM Shengkai Fang <fskm...@gmail.com> wrote:
>>
>>> Hi, Yuval, Jark, Timo.
>>>
>>> Currently the watermark push down happens in the logical rewrite phase
>>> but the filter push down happens in the local phase, which means the
>>> planner will first check the Filter push down and then check the watermark
>>> push down.
>>>
>>> I think we need a rule to transpose between the filter and watermark
>>> assigner or extend the filter push down rule to capture the structure that
>>> the watermark assigner is the parent of the table scan.
>>>
>>> Best,
>>> Shengkai
>>>
>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年3月8日周一 上午12:13写道:
>>>
>>>> Hi Jark,
>>>>
>>>> Even after implementing both, I don't see the watermark being pushed to
>>>> the tablesource in the logical plan and avoids predicate pushdown from
>>>> running.
>>>>
>>>> On Sun, Mar 7, 2021, 15:43 Jark Wu <imj...@gmail.com> wrote:
>>>>
>>>>> Hi Yuval,
>>>>>
>>>>> That's correct you will always get a LogicalWatermarkAssigner if you
>>>>> assigned a watermark.
>>>>> If you implement SupportsWatermarkPushdown,
>>>>> the LogicalWatermarkAssigner will be pushed
>>>>> into TableSource, and then you can push Filter into source if source
>>>>> implement SupportsFilterPushdown.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Sat, 6 Mar 2021 at 01:16, Yuval Itzchakov <yuva...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Timo,
>>>>>> After investigating this further, this is actually non related to
>>>>>> implementing SupportsWatermarkPushdown.
>>>>>>
>>>>>> Once I create a TableSchema for my custom source's RowData, and
>>>>>> assign it a watermark (see my example in the original mail), the plan 
>>>>>> will
>>>>>> always include a LogicalWatermarkAssigner. This assigner that is between
>>>>>> the LogicalTableScan and the LogicalFilter will then go on and fail the
>>>>>> HepPlanner from invoking the optimization since it requires
>>>>>> LogicalTableScan to be a direct child of LogicalFilter. Since I have
>>>>>> LogicalFilter -> LogicalWatermarkAssigner -> LogicalTableScan, this won't
>>>>>> work.
>>>>>>
>>>>>> On Fri, Mar 5, 2021 at 5:59 PM Timo Walther <twal...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Yuval,
>>>>>>>
>>>>>>> sorry that nobody replied earlier. Somehow your email fell through
>>>>>>> the
>>>>>>> cracks.
>>>>>>>
>>>>>>> If I understand you correctly, could would like to implement a table
>>>>>>> source that implements both `SupportsWatermarkPushDown` and
>>>>>>> `SupportsFilterPushDown`?
>>>>>>>
>>>>>>> The current behavior might be on purpose. Filters and Watermarks are
>>>>>>> not
>>>>>>> very compatible. Filtering would also mean that records (from which
>>>>>>> watermarks could be generated) are skipped. If the filter is very
>>>>>>> strict, we would not generate any new watermarks and the pipeline
>>>>>>> would
>>>>>>> stop making progress in time.
>>>>>>>
>>>>>>> Watermark push down is only necessary, if per-partition watermarks
>>>>>>> are
>>>>>>> required. Otherwise the watermarks are generated in a subsequent
>>>>>>> operator after the source. So you can still use rowtime without
>>>>>>> implementing `SupportsWatermarkPushDown` in your custom source.
>>>>>>>
>>>>>>> I will lookp in Shengkai who worked on this topic recently.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>>
>>>>>>> On 04.03.21 18:52, Yuval Itzchakov wrote:
>>>>>>> > Bumping this up again, would appreciate any help if anyone is
>>>>>>> familiar
>>>>>>> > with the blink planner.
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Yuval.
>>>>>>> >
>>>>>>> > On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com
>>>>>>> > <mailto:yuva...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >     Hi Jark,
>>>>>>> >     Would appreciate your help with this.
>>>>>>> >
>>>>>>> >     On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <
>>>>>>> ro...@apache.org
>>>>>>> >     <mailto:ro...@apache.org>> wrote:
>>>>>>> >
>>>>>>> >         Hi Yuval,
>>>>>>> >
>>>>>>> >         I'm not familiar with the Blink planner but probably Jark
>>>>>>> can help.
>>>>>>> >
>>>>>>> >         Regards,
>>>>>>> >         Roman
>>>>>>> >
>>>>>>> >
>>>>>>> >         On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
>>>>>>> >         <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >             Update: When I don't set the watermark explicitly on
>>>>>>> the
>>>>>>> >             TableSchema, `applyWatermarkStrategy` never gets
>>>>>>> called on
>>>>>>> >             my ScanTableSource, which does make sense. But now the
>>>>>>> >             question is what should be done? This feels a bit
>>>>>>> unintuitive.
>>>>>>> >
>>>>>>> >             On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
>>>>>>> >             <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >                 Hi,
>>>>>>> >                 Flink 1.12.1, Blink Planner, Scala 2.12
>>>>>>> >
>>>>>>> >                 I have the following logical plan:
>>>>>>> >
>>>>>>> >
>>>>>>>  LogicalSink(table=[default_catalog.default_database.table], 
>>>>>>> fields=[bar,
>>>>>>> baz, hello_world, a, b])
>>>>>>> >                 +- LogicalProject(value=[$2],
>>>>>>> >                 bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>>>>> >                 baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
>>>>>>> >                 hello_world=[null:VARCHAR(2147483647) CHARACTER SET
>>>>>>> >                 "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER
>>>>>>> SET
>>>>>>> >                 "UTF-16LE"], b=[EMPTY_MAP()])
>>>>>>> >                     +- LogicalFilter(condition=[AND(=($4,
>>>>>>> >                 _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
>>>>>>> >                        +- LogicalWatermarkAssigner(rowtime=[bar],
>>>>>>> >                 watermark=[$0])
>>>>>>> >                           +-
>>>>>>> LogicalTableScan(table=[[default_catalog,
>>>>>>> >                 default_database, foo]])
>>>>>>> >
>>>>>>> >                 I have a custom source which creates a TableSchema
>>>>>>> based
>>>>>>> >                 on an external table. When I create the schema, I
>>>>>>> push
>>>>>>> >                 the watermark definition to the schema:
>>>>>>> >
>>>>>>> >                 image.png
>>>>>>> >
>>>>>>> >                 When the HepPlanner starts the optimization phase
>>>>>>> and
>>>>>>> >                 reaches the "PushFilterInotTableSourceScanRule", it
>>>>>>> >                 matches on the LogicalFilter in the definition. But
>>>>>>> >                 then, since the RelOptRuleOperandChildPolicy is
>>>>>>> set to
>>>>>>> >                 "SOME", it attempts to do a full match on the child
>>>>>>> >                 nodes. Since the rule is defined as so:
>>>>>>> >
>>>>>>> >                 image.png
>>>>>>> >
>>>>>>> >                 The child filter fails since the immediate child
>>>>>>> of the
>>>>>>> >                 filter is a "LocalWatermarkAssigner", and not the
>>>>>>> >                 "LogicalTableScan" which is the grandchild:
>>>>>>> >
>>>>>>> >                 image.png
>>>>>>> >
>>>>>>> >                 Is this the desired behavior? Should I create the
>>>>>>> >                 TableSchema without the row time attribute and use
>>>>>>> >                 "SupportsWatermarkPushdown" to generate the
>>>>>>> watermark
>>>>>>> >                 dynamically from the source record?
>>>>>>> >
>>>>>>> >                 --
>>>>>>> >                 Best Regards,
>>>>>>> >                 Yuval Itzchakov.
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >             --
>>>>>>> >             Best Regards,
>>>>>>> >             Yuval Itzchakov.
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >     --
>>>>>>> >     Best Regards,
>>>>>>> >     Yuval Itzchakov.
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Yuval Itzchakov.
>>>>>>
>>>>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

Reply via email to