Thanks Martijn,

Indeed, implementing the logic check in the applyFilters method can fulfill the 
functionality of disabling filter pushdown. 
My concern is that the same logic check may need to be implemented in each 
source.

public Result applyFilters(List<ResolvedExpression> filters) {
    if (supportsFilterPushDown) {
        return applyFiltersInternal(filters);
    } else {
        return Result.of(Collections.emptyList(), filters);
    }
}


If we define enough generic configurations, we can also pass these 
configurations uniformly in the abstract source superclass 
and provide a default implementation to determine whether to allow filter 
pushdown based on the options.

public abstract class FilterableDynamicTableSource
        implements DynamicTableSource, SupportsFilterPushDown {

    private Configuration sourceConfig;

    @Override
    public boolean enableFilterPushDown() {
        return sourceConfig.get(ENABLE_FILTER_PUSH_DOWN);
    }
}


Best,
Jiabao


> 2023年10月24日 17:59,Martijn Visser <martijnvis...@apache.org> 写道:
> 
> Hi Jiabao,
> 
> I'm in favour of Jark's approach: while I can see the need for a
> generic flag, I can also foresee the situation where users actually
> want to be able to control it per connector. So why not go directly
> for that approach?
> 
> Best regards,
> 
> Martijn
> 
> On Tue, Oct 24, 2023 at 11:37 AM Jane Chan <qingyue....@gmail.com> wrote:
>> 
>> Hi Jiabao,
>> 
>> Thanks for driving this discussion. I have a small question that will
>> "scan.filter-push-down.enabled" take precedence over
>> "table.optimizer.source.predicate" when the two parameters might conflict
>> each other?
>> 
>> Best,
>> Jane
>> 
>> On Tue, Oct 24, 2023 at 5:05 PM Jiabao Sun <jiabao....@xtransfer.cn.invalid>
>> wrote:
>> 
>>> Thanks Jark,
>>> 
>>> If we only add configuration without adding the enableFilterPushDown
>>> method in the SupportsFilterPushDown interface,
>>> each connector would have to handle the same logic in the applyFilters
>>> method to determine whether filter pushdown is needed.
>>> This would increase complexity and violate the original behavior of the
>>> applyFilters method.
>>> 
>>> On the contrary, we only need to pass the configuration parameter in the
>>> newly added enableFilterPushDown method
>>> to decide whether to perform predicate pushdown.
>>> 
>>> I think this approach would be clearer and simpler.
>>> WDYT?
>>> 
>>> Best,
>>> Jiabao
>>> 
>>> 
>>>> 2023年10月24日 16:58,Jark Wu <imj...@gmail.com> 写道:
>>>> 
>>>> Hi JIabao,
>>>> 
>>>> I think the current interface can already satisfy your requirements.
>>>> The connector can reject all the filters by returning the input filters
>>>> as `Result#remainingFilters`.
>>>> 
>>>> So maybe we don't need to introduce a new method to disable
>>>> pushdown, but just introduce an option for the specific connector.
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>> On Tue, 24 Oct 2023 at 16:38, Leonard Xu <xbjt...@gmail.com> wrote:
>>>> 
>>>>> Thanks @Jiabao for kicking off this discussion.
>>>>> 
>>>>> Could you add a section to explain the difference between proposed
>>>>> connector level config `scan.filter-push-down.enabled` and existing
>>> query
>>>>> level config `table.optimizer.source.predicate-pushdown-enabled` ?
>>>>> 
>>>>> Best,
>>>>> Leonard
>>>>> 
>>>>>> 2023年10月24日 下午4:18,Jiabao Sun <jiabao....@xtransfer.cn.INVALID> 写道:
>>>>>> 
>>>>>> Hi Devs,
>>>>>> 
>>>>>> I would like to start a discussion on FLIP-377: support configuration
>>> to
>>>>> disable filter pushdown for Table/SQL Sources[1].
>>>>>> 
>>>>>> Currently, Flink Table/SQL does not expose fine-grained control for
>>>>> users to enable or disable filter pushdown.
>>>>>> However, filter pushdown has some side effects, such as additional
>>>>> computational pressure on external systems.
>>>>>> Moreover, Improper queries can lead to issues such as full table scans,
>>>>> which in turn can impact the stability of external systems.
>>>>>> 
>>>>>> Suppose we have an SQL query with two sources: Kafka and a database.
>>>>>> The database is sensitive to pressure, and we want to configure it to
>>>>> not perform filter pushdown to the database source.
>>>>>> However, we still want to perform filter pushdown to the Kafka source
>>> to
>>>>> decrease network IO.
>>>>>> 
>>>>>> I propose to support configuration to disable filter push down for
>>>>> Table/SQL sources to let user decide whether to perform filter pushdown.
>>>>>> 
>>>>>> Looking forward to your feedback.
>>>>>> 
>>>>>> [1]
>>>>> 
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768
>>>>>> 
>>>>>> Best,
>>>>>> Jiabao
>>>>> 
>>>>> 
>>> 

Reply via email to