Thanks for the proposal, Jiabao. My two cents below: 1. If I understand correctly, the motivation of the FLIP is mainly to make predicate pushdown optional on SOME of the Sources. If so, intuitively the configuration should be Source specific instead of general. Otherwise, we will end up with general configurations that may not take effect for some of the Source implementations. This violates the basic rule of a configuration - it does what it says, regardless of the implementation. While configuration standardization is usually a good thing, it should not break the basic rules. If we really want to have this general configuration, for the sources this configuration does not apply, they should throw an exception to make it clear that this configuration is not supported. However, that seems ugly.
2. I think the actual motivation of this FLIP is about "how a source should implement predicate pushdown efficiently", not "whether predicate pushdown should be applied to the source." For example, if a source wants to avoid additional computing load in the external system, it can always read the entire record and apply the predicates by itself. However, from the Flink perspective, the predicate pushdown is applied, it is just implemented differently by the source. So the design principle here is that Flink only cares about whether a source supports predicate pushdown or not, it does not care about the implementation efficiency / side effect of the predicates pushdown. It is the Source implementation's responsibility to ensure the predicates pushdown is implemented efficiently and does not impose excessive pressure on the external system. And it is OK to have additional configurations to achieve this goal. Obviously, such configurations will be source specific in this case. 3. Regarding the existing configurations of *table.optimizer.source.predicate-pushdown-enabled. *I am not sure why we need it. Supposedly, if a source implements a SupportsXXXPushDown interface, the optimizer should push the corresponding predicates to the Source. I am not sure in which case this configuration would be used. Any ideas @Jark Wu <imj...@gmail.com>? Thanks, Jiangjie (Becket) Qin On Wed, Oct 25, 2023 at 11:55 PM Jiabao Sun <jiabao....@xtransfer.cn.invalid> wrote: > Thanks Jane for the detailed explanation. > > I think that for users, we should respect conventions over configurations. > Conventions can be default values explicitly specified in configurations, > or they can be behaviors that follow previous versions. > If the same code has different behaviors in different versions, it would > be a very bad thing. > > I agree that for regular users, it is not necessary to understand all the > configurations related to Flink. > By following conventions, they can have a good experience. > > Let's get back to the practical situation and consider it. > > Case 1: > The user is not familiar with the purpose of the > table.optimizer.source.predicate-pushdown-enabled configuration but follows > the convention of allowing predicate pushdown to the source by default. > Just understanding the source.predicate-pushdown-enabled configuration and > performing fine-grained toggle control will work well. > > Case 2: > The user understands the meaning of the > table.optimizer.source.predicate-pushdown-enabled configuration and has set > its value to false. > We have reason to believe that the user understands the meaning of the > predicate pushdown configuration and the intention is to disable predicate > pushdown (rather than whether or not to allow it). > The previous choice of globally disabling it is likely because it couldn't > be disabled on individual sources. > From this perspective, if we provide more fine-grained configuration > support and provide detailed explanations of the configuration behaviors in > the documentation, > users can clearly understand the differences between these two > configurations and use them correctly. > > Also, I don't agree that table.optimizer.source.predicate-pushdown-enabled > = true and source.predicate-pushdown-enabled = false means that the local > configuration overrides the global configuration. > On the contrary, both configurations are functioning correctly. > The optimizer allows predicate pushdown to all sources, but some sources > can reject the filters pushed down by the optimizer. > This is natural, just like different components at different levels are > responsible for different tasks. > > The more serious issue is that if "source.predicate-pushdown-enabled" does > not respect "table.optimizer.source.predicate-pushdown-enabled”, > the "table.optimizer.source.predicate-pushdown-enabled" configuration will > be invalidated. > This means that regardless of whether > "table.optimizer.source.predicate-pushdown-enabled" is set to true or > false, it will have no effect. > > Best, > Jiabao > > > > 2023年10月25日 22:24,Jane Chan <qingyue....@gmail.com> 写道: > > > > Hi Jiabao, > > > > Thanks for the in-depth clarification. Here are my cents > > > > However, "table.optimizer.source.predicate-pushdown-enabled" and > >> "scan.filter-push-down.enabled" are configurations for different > >> components(optimizer and source operator). > >> > > > > We cannot assume that every user would be interested in understanding the > > internal components of Flink, such as the optimizer or connectors, and > the > > specific configurations associated with each component. Instead, users > > might be more concerned about knowing which configuration enables or > > disables the filter push-down feature for all source connectors, and > which > > parameter provides the flexibility to override this behavior for a single > > source if needed. > > > > So, from this perspective, I am inclined to divide these two parameters > > based on the scope of their impact from the user's perspective (i.e. > > global-level or operator-level), rather than categorizing them based on > the > > component hierarchy from a developer's point of view. Therefore, based on > > this premise, it is intuitive and natural for users to > > understand fine-grained configuration options can override global > > configurations. > > > > Additionally, if "scan.filter-push-down.enabled" doesn't respect to > >> "table.optimizer.source.predicate-pushdown-enabled" and the default > value > >> of "scan.filter-push-down.enabled" is defined as true, > >> it means that just modifying > >> "table.optimizer.source.predicate-pushdown-enabled" as false will have > no > >> effect, and filter pushdown will still be performed. > >> > >> If we define the default value of "scan.filter-push-down.enabled" as > >> false, it would introduce a difference in behavior compared to the > previous > >> version. > >> > > > > <1>If I understand correctly, "scan.filter-push-down.enabled" is a > > connector option, which means the only way to configure it is to > explicitly > > specify it in DDL (no matter whether disable or enable), and the SET > > command is not applicable, so I think it's natural to still respect > user's > > specification here. Otherwise, users might be more confused about why the > > DDL does not work as expected, and the reason is just because some other > > "optimizer" configuration is set to a different value. > > > > <2> From the implementation side, I am inclined to keep the parameter's > > priority consistent for all conditions. > > > > Let "global" denote "table.optimizer.source.predicate-pushdown-enabled", > > and let "per-source" denote "scan.filter-push-down.enabled" for specific > > source T, the following Truth table (based on the current design) > > indicates the inconsistent behavior for "per-source override global". > > > > .------------.---------------.------------------- > > ----.-------------------------------------. > > | global | per-source | push-down for T | per-source override global | > > > :-----------+--------------+-----------------------+------------------------------------: > > | true | false | false | Y > > | > > > :-----------+--------------+-----------------------+------------------------------------: > > | false | true | false | N > > | > > > .------------.---------------.-----------------------.-------------------------------------. > > > > Best, > > Jane > > > > On Wed, Oct 25, 2023 at 6:22 PM Jiabao Sun <jiabao....@xtransfer.cn > .invalid> > > wrote: > > > >> Thanks Benchao for the feedback. > >> > >> I understand that the configuration of global parallelism and task > >> parallelism is at different granularities but with the same > configuration. > >> However, "table.optimizer.source.predicate-pushdown-enabled" and > >> "scan.filter-push-down.enabled" are configurations for different > >> components(optimizer and source operator). > >> > >> From a user's perspective, there are two scenarios: > >> > >> 1. Disabling all filter pushdown > >> In this case, setting > "table.optimizer.source.predicate-pushdown-enabled" > >> to false is sufficient to meet the requirement. > >> > >> 2. Disabling filter pushdown for specific sources > >> In this scenario, there is no need to adjust the value of > >> "table.optimizer.source.predicate-pushdown-enabled". > >> Instead, the focus should be on the configuration of > >> "scan.filter-push-down.enabled" to meet the requirement. > >> In this case, users do not need to set > >> "table.optimizer.source.predicate-pushdown-enabled" to false and > manually > >> enable filter pushdown for specific sources. > >> > >> Additionally, if "scan.filter-push-down.enabled" doesn't respect to > >> "table.optimizer.source.predicate-pushdown-enabled" and the default > value > >> of "scan.filter-push-down.enabled" is defined as true, > >> it means that just modifying > >> "table.optimizer.source.predicate-pushdown-enabled" as false will have > no > >> effect, and filter pushdown will still be performed. > >> > >> If we define the default value of "scan.filter-push-down.enabled" as > >> false, it would introduce a difference in behavior compared to the > previous > >> version. > >> The same SQL query that could successfully push down filters in the old > >> version but would no longer do so after the upgrade. > >> > >> Best, > >> Jiabao > >> > >> > >>> 2023年10月25日 17:10,Benchao Li <libenc...@apache.org> 写道: > >>> > >>> Thanks Jiabao for the detailed explanations, that helps a lot, I > >>> understand your rationale now. > >>> > >>> Correct me if I'm wrong. Your perspective is from "developer", which > >>> means there is an optimizer and connector component, and if we want to > >>> enable this feature (pushing filters down into connectors), you must > >>> enable it firstly in optimizer, and only then connector has the chance > >>> to decide to use it or not. > >>> > >>> My perspective is from "user" that (Why a user should care about the > >>> difference of optimizer/connector) , this is a feature, and has two > >>> way to control it, one way is to config it job-level, the other one is > >>> in table properties. What a user expects is that they can control a > >>> feature in a tiered way, that setting it per job, and then > >>> fine-grained tune it per table. > >>> > >>> This is some kind of similar to other concepts, such as parallelism, > >>> users can set a job level default parallelism, and then fine-grained > >>> tune it per operator. There may be more such debate in the future > >>> e.g., we can have a job level config about adding key-by before lookup > >>> join, and also a hint/table property way to fine-grained control it > >>> per lookup operator. Hence we'd better find a unified way for all > >>> those similar kind of features. > >>> > >>> Jiabao Sun <jiabao....@xtransfer.cn.invalid> 于2023年10月25日周三 15:27写道: > >>>> > >>>> Thanks Jane for further explanation. > >>>> > >>>> These two configurations correspond to different levels. > >> "scan.filter-push-down.enabled" does not make > >> "table.optimizer.source.predicate" invalid. > >>>> The planner will still push down predicates to all sources. > >>>> Whether filter pushdown is allowed or not is determined by the > specific > >> source's "scan.filter-push-down.enabled" configuration. > >>>> > >>>> However, "table.optimizer.source.predicate" does directly affect > >> "scan.filter-push-down.enabled”. > >>>> When the planner disables predicate pushdown, the source-level filter > >> pushdown will also not be executed, even if the source allows filter > >> pushdown. > >>>> > >>>> Whatever, in point 1 and 2, our expectation is consistent. > >>>> For the 3rd point, I still think that the planner-level configuration > >> takes precedence over the source-level configuration. > >>>> It may seem counterintuitive when we globally disable predicate > >> pushdown but allow filter pushdown at the source level. > >>>> > >>>> Best, > >>>> Jiabao > >>>> > >>>> > >>>> > >>>>> 2023年10月25日 14:35,Jane Chan <qingyue....@gmail.com> 写道: > >>>>> > >>>>> Hi Jiabao, > >>>>> > >>>>> Thanks for clarifying this. While by "scan.filter-push-down.enabled > >> takes a > >>>>> higher priority" I meant that this value should be respected whenever > >> it is > >>>>> set explicitly. > >>>>> > >>>>> The conclusion that > >>>>> > >>>>> 2. "table.optimizer.source.predicate" = "true" and > >>>>>> "scan.filter-push-down.enabled" = "false" > >>>>>> Allow the planner to perform predicate pushdown, but individual > >> sources do > >>>>>> not enable filter pushdown. > >>>>>> > >>>>> > >>>>> This indicates that the option "scan.filter-push-down.enabled = > false" > >> for > >>>>> an individual source connector does indeed override the global-level > >>>>> planner settings to make a difference. And thus "has a higher > >> priority". > >>>>> > >>>>> While for > >>>>> > >>>>> 3. "table.optimizer.source.predicate" = "false" > >>>>>> Predicate pushdown is not allowed for the planner. > >>>>>> Regardless of the value of the "scan.filter-push-down.enabled" > >>>>>> configuration, filter pushdown is disabled. > >>>>>> In this scenario, the behavior remains consistent with the old > >> version as > >>>>>> well. > >>>>>> > >>>>> > >>>>> I still think "scan.filter-push-down.enabled" should also be > respected > >> if > >>>>> it is enabled for individual connectors. WDYT? > >>>>> > >>>>> Best, > >>>>> Jane > >>>>> > >>>>> On Wed, Oct 25, 2023 at 1:27 PM Jiabao Sun <jiabao....@xtransfer.cn > >> .invalid> > >>>>> wrote: > >>>>> > >>>>>> Thanks Benchao for the feedback. > >>>>>> > >>>>>> For the current proposal, we recommend keeping the default value of > >>>>>> "table.optimizer.source.predicate" as true, > >>>>>> and setting the the default value of newly introduced option > >>>>>> "scan.filter-push-down.enabled" to true as well. > >>>>>> > >>>>>> The main purpose of doing this is to maintain consistency with > >> previous > >>>>>> versions, as whether to perform > >>>>>> filter pushdown in the old version solely depends on the > >>>>>> "table.optimizer.source.predicate" option. > >>>>>> That means by default, as long as a TableSource implements the > >>>>>> SupportsFilterPushDown interface, filter pushdown is allowed. > >>>>>> And it seems that we don't have much benefit in changing the default > >> value > >>>>>> of "table.optimizer.source.predicate" to false. > >>>>>> > >>>>>> Regarding the priority of these two configurations, I believe that > >>>>>> "table.optimizer.source.predicate" > >>>>>> takes precedence over "scan.filter-push-down.enabled" and it > exhibits > >> the > >>>>>> following behavior. > >>>>>> > >>>>>> 1. "table.optimizer.source.predicate" = "true" and > >>>>>> "scan.filter-push-down.enabled" = "true" > >>>>>> This is the default behavior, allowing filter pushdown for sources. > >>>>>> > >>>>>> 2. "table.optimizer.source.predicate" = "true" and > >>>>>> "scan.filter-push-down.enabled" = "false" > >>>>>> Allow the planner to perform predicate pushdown, but individual > >> sources do > >>>>>> not enable filter pushdown. > >>>>>> > >>>>>> 3. "table.optimizer.source.predicate" = "false" > >>>>>> Predicate pushdown is not allowed for the planner. > >>>>>> Regardless of the value of the "scan.filter-push-down.enabled" > >>>>>> configuration, filter pushdown is disabled. > >>>>>> In this scenario, the behavior remains consistent with the old > >> version as > >>>>>> well. > >>>>>> > >>>>>> > >>>>>> From an implementation perspective, setting the priority of > >>>>>> "scan.filter-push-down.enabled" higher than > >>>>>> "table.optimizer.source.predicate" is difficult to achieve now. > >>>>>> Because the PushFilterIntoSourceScanRuleBase at the planner level > >> takes > >>>>>> precedence over the source-level FilterPushDownSpec. > >>>>>> Only when the PushFilterIntoSourceScanRuleBase is enabled, will the > >>>>>> Source-level filter pushdown be performed. > >>>>>> > >>>>>> Additionally, in my opinion, there doesn't seem to be much benefit > in > >>>>>> setting a higher priority for "scan.filter-push-down.enabled". > >>>>>> It may instead affect compatibility and increase implementation > >> complexity. > >>>>>> > >>>>>> WDYT? > >>>>>> > >>>>>> Best, > >>>>>> Jiabao > >>>>>> > >>>>>> > >>>>>>> 2023年10月25日 11:56,Benchao Li <libenc...@apache.org> 写道: > >>>>>>> > >>>>>>> I agree with Jane that fine-grained configurations should have > higher > >>>>>>> priority than job level configurations. > >>>>>>> > >>>>>>> For current proposal, we can achieve that: > >>>>>>> - Set "table.optimizer.source.predicate" = "true" to enable by > >>>>>>> default, and set ""scan.filter-push-down.enabled" = "false" to > >> disable > >>>>>>> it per table source > >>>>>>> - Set "table.optimizer.source.predicate" = "false" to disable by > >>>>>>> default, and set ""scan.filter-push-down.enabled" = "true" to > enable > >>>>>>> it per table source > >>>>>>> > >>>>>>> Jane Chan <qingyue....@gmail.com> 于2023年10月24日周二 23:55写道: > >>>>>>>> > >>>>>>>>> > >>>>>>>>> I believe that the configuration > "table.optimizer.source.predicate" > >>>>>> has a > >>>>>>>>> higher priority at the planner level than the configuration at > the > >>>>>> source > >>>>>>>>> level, > >>>>>>>>> and it seems easy to implement now. > >>>>>>>>> > >>>>>>>> > >>>>>>>> Correct me if I'm wrong, but I think the fine-grained > configuration > >>>>>>>> "scan.filter-push-down.enabled" should have a higher priority > >> because > >>>>>> the > >>>>>>>> default value of "table.optimizer.source.predicate" is true. As a > >>>>>> result, > >>>>>>>> turning off filter push-down for a specific source will not take > >> effect > >>>>>>>> unless the default value of "table.optimizer.source.predicate" is > >>>>>> changed > >>>>>>>> to false, or, alternatively, let users manually set > >>>>>>>> "table.optimizer.source.predicate" to false first and then > >> selectively > >>>>>>>> enable filter push-down for the desired sources, which is less > >>>>>> intuitive. > >>>>>>>> WDYT? > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Jane > >>>>>>>> > >>>>>>>> On Tue, Oct 24, 2023 at 6:05 PM Jiabao Sun < > jiabao....@xtransfer.cn > >>>>>> .invalid> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Thanks Jane, > >>>>>>>>> > >>>>>>>>> I believe that the configuration > "table.optimizer.source.predicate" > >>>>>> has a > >>>>>>>>> higher priority at the planner level than the configuration at > the > >>>>>> source > >>>>>>>>> level, > >>>>>>>>> and it seems easy to implement now. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Jiabao > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> 2023年10月24日 17:36,Jane Chan <qingyue....@gmail.com> 写道: > >>>>>>>>>> > >>>>>>>>>> Hi Jiabao, > >>>>>>>>>> > >>>>>>>>>> Thanks for driving this discussion. I have a small question that > >> will > >>>>>>>>>> "scan.filter-push-down.enabled" take precedence over > >>>>>>>>>> "table.optimizer.source.predicate" when the two parameters might > >>>>>> conflict > >>>>>>>>>> each other? > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Jane > >>>>>>>>>> > >>>>>>>>>> On Tue, Oct 24, 2023 at 5:05 PM Jiabao Sun < > >> jiabao....@xtransfer.cn > >>>>>>>>> .invalid> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks Jark, > >>>>>>>>>>> > >>>>>>>>>>> If we only add configuration without adding the > >> enableFilterPushDown > >>>>>>>>>>> method in the SupportsFilterPushDown interface, > >>>>>>>>>>> each connector would have to handle the same logic in the > >>>>>> applyFilters > >>>>>>>>>>> method to determine whether filter pushdown is needed. > >>>>>>>>>>> This would increase complexity and violate the original > behavior > >> of > >>>>>> the > >>>>>>>>>>> applyFilters method. > >>>>>>>>>>> > >>>>>>>>>>> On the contrary, we only need to pass the configuration > >> parameter in > >>>>>> the > >>>>>>>>>>> newly added enableFilterPushDown method > >>>>>>>>>>> to decide whether to perform predicate pushdown. > >>>>>>>>>>> > >>>>>>>>>>> I think this approach would be clearer and simpler. > >>>>>>>>>>> WDYT? > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Jiabao > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> 2023年10月24日 16:58,Jark Wu <imj...@gmail.com> 写道: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi JIabao, > >>>>>>>>>>>> > >>>>>>>>>>>> I think the current interface can already satisfy your > >> requirements. > >>>>>>>>>>>> The connector can reject all the filters by returning the > input > >>>>>> filters > >>>>>>>>>>>> as `Result#remainingFilters`. > >>>>>>>>>>>> > >>>>>>>>>>>> So maybe we don't need to introduce a new method to disable > >>>>>>>>>>>> pushdown, but just introduce an option for the specific > >> connector. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Jark > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, 24 Oct 2023 at 16:38, Leonard Xu <xbjt...@gmail.com> > >> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks @Jiabao for kicking off this discussion. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Could you add a section to explain the difference between > >> proposed > >>>>>>>>>>>>> connector level config `scan.filter-push-down.enabled` and > >> existing > >>>>>>>>>>> query > >>>>>>>>>>>>> level config > >> `table.optimizer.source.predicate-pushdown-enabled` ? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Leonard > >>>>>>>>>>>>> > >>>>>>>>>>>>>> 2023年10月24日 下午4:18,Jiabao Sun <jiabao....@xtransfer.cn > >> .INVALID> > >>>>>> 写道: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Devs, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I would like to start a discussion on FLIP-377: support > >>>>>> configuration > >>>>>>>>>>> to > >>>>>>>>>>>>> disable filter pushdown for Table/SQL Sources[1]. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Currently, Flink Table/SQL does not expose fine-grained > >> control > >>>>>> for > >>>>>>>>>>>>> users to enable or disable filter pushdown. > >>>>>>>>>>>>>> However, filter pushdown has some side effects, such as > >> additional > >>>>>>>>>>>>> computational pressure on external systems. > >>>>>>>>>>>>>> Moreover, Improper queries can lead to issues such as full > >> table > >>>>>>>>> scans, > >>>>>>>>>>>>> which in turn can impact the stability of external systems. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Suppose we have an SQL query with two sources: Kafka and a > >>>>>> database. > >>>>>>>>>>>>>> The database is sensitive to pressure, and we want to > >> configure > >>>>>> it to > >>>>>>>>>>>>> not perform filter pushdown to the database source. > >>>>>>>>>>>>>> However, we still want to perform filter pushdown to the > Kafka > >>>>>> source > >>>>>>>>>>> to > >>>>>>>>>>>>> decrease network IO. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I propose to support configuration to disable filter push > >> down for > >>>>>>>>>>>>> Table/SQL sources to let user decide whether to perform > filter > >>>>>>>>> pushdown. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> [1] > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Jiabao > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> > >>>>>>> Best, > >>>>>>> Benchao Li > >>>>>> > >>>>>> > >>>> > >>> > >>> > >>> -- > >>> > >>> Best, > >>> Benchao Li > >> > >> > >