(Sorry, I pressed send too early) Thanks for the help @zhengyunhon...@gmail.com.
Agree on not changing the API as much as possible as well as wrt simplifying Projection pushdown with nested fields eventually as well. In terms of the code itself, currently I am trying to leverage the FieldReferenceExpression to also handle nested fields for filter push down. But where I am currently struggling to make progress is, once the filters are pushed to the table source itself, in PushFilterIntoSourceScanRuleBase#resolveFiltersAndCreateTableSourceTable there is a conversion from List<ResolvedExpression (in this case FieldReferenceExpression) to the List<RexNode> itself. If you have some pointers for that, please let me know. Thanks. Regards Venkata krishnan On Sun, Aug 6, 2023 at 10:23 PM Venkatakrishnan Sowrirajan <vsowr...@asu.edu> wrote: > Thanks @zhengyunhon...@gmail.com > Regards > Venkata krishnan > > > On Sun, Aug 6, 2023 at 6:16 PM yh z <zhengyunhon...@gmail.com> wrote: > >> Hi, Venkatakrishnan, >> I think this is a very useful feature. I have been focusing on the >> development of the flink-table-planner module recently, so if you need >> some >> help, I can assist you in completing the development of some sub-tasks or >> code review. >> >> Returning to the design itself, I think it's necessary to modify >> FieldReferenceExpression or re-implement a NestedFieldReferenceExpression. >> As for modifying the interface of SupportsProjectionPushDown, I think we >> need to make some trade-offs. As a connector developer, the stability of >> the interface is very important. If there are no unresolved bugs, I >> personally do not recommend modifying the interface. However, when I first >> read the code of SupportsProjectionPushDown, the design of int[][] was >> very >> confusing for me, and it took me a long time to understand it by running >> specify UT tests. Therefore, in terms of the design of this interface and >> the consistency between different interfaces, there is indeed room for >> improvement it. >> >> Thanks, >> Yunhong Zheng (Swuferhong) >> >> >> >> >> Becket Qin <becket....@gmail.com> 于2023年8月3日周四 07:44写道: >> >> > Hi Jark, >> > >> > If the FieldReferenceExpression contains an int[] to support a nested >> field >> > reference, List<FieldReferenceExpression> (or >> FieldReferenceExpression[]) >> > and int[][] are actually equivalent. If we are designing this from >> scratch, >> > personally I prefer using List<FieldReferenceExpression> for >> consistency, >> > i.e. always resolving everything to expressions for users. Projection >> is a >> > simpler case, but should not be a special case. This avoids doing the >> same >> > thing in different ways which is also a confusion to the users. To me, >> the >> > int[][] format would become kind of a technical debt after we extend the >> > FieldReferenceExpression. Although we don't have to address it right >> away >> > in the same FLIP, this kind of debt accumulates over time and makes the >> > project harder to learn and maintain. So, personally I prefer to address >> > these technical debts as soon as possible. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > On Wed, Aug 2, 2023 at 8:19 PM Jark Wu <imj...@gmail.com> wrote: >> > >> > > Hi, >> > > >> > > I agree with Becket that we may need to extend >> FieldReferenceExpression >> > to >> > > support nested field access (or maybe a new >> > > NestedFieldReferenceExpression). >> > > But I have some concerns about evolving the >> > > SupportsProjectionPushDown.applyProjection. >> > > A projection is much simpler than Filter Expression which only needs >> to >> > > represent the field indexes. >> > > If we evolve `applyProjection` to accept >> `List<FieldReferenceExpression> >> > > projectedFields`, >> > > users have to convert the `List<FieldReferenceExpression>` back to >> > int[][] >> > > which is an overhead for users. >> > > Field indexes (int[][]) is required to project schemas with the >> > > utility org.apache.flink.table.connector.Projection. >> > > >> > > >> > > Best, >> > > Jark >> > > >> > > >> > > >> > > On Wed, 2 Aug 2023 at 07:40, Venkatakrishnan Sowrirajan < >> > vsowr...@asu.edu> >> > > wrote: >> > > >> > > > Thanks Becket for the suggestion. That makes sense. Let me try it >> out >> > and >> > > > get back to you. >> > > > >> > > > Regards >> > > > Venkata krishnan >> > > > >> > > > >> > > > On Tue, Aug 1, 2023 at 9:04 AM Becket Qin <becket....@gmail.com> >> > wrote: >> > > > >> > > > > This is a very useful feature in practice. >> > > > > >> > > > > It looks to me that the key issue here is that Flink >> > ResolvedExpression >> > > > > does not have necessary abstraction for nested field access. So >> the >> > > > Calcite >> > > > > RexFieldAccess does not have a counterpart in the >> ResolvedExpression. >> > > The >> > > > > FieldReferenceExpression only supports direct access to the >> fields, >> > not >> > > > > nested access. >> > > > > >> > > > > Theoretically speaking, this nested field reference is also >> required >> > by >> > > > > projection pushdown. However, we addressed that by using an >> int[][] >> > in >> > > > the >> > > > > SupportsProjectionPushDown interface. Maybe we can do the >> following: >> > > > > >> > > > > 1. Extend the FieldReferenceExpression to include an int[] for >> nested >> > > > field >> > > > > access, >> > > > > 2. By doing (1), >> > > > > SupportsFilterPushDown#applyFilters(List<ResolvedExpression>) can >> > > support >> > > > > nested field access. >> > > > > 3. Evolve the SupportsProjectionPushDown.applyProjection(int[][] >> > > > > projectedFields, DataType producedDataType) to >> > > > > applyProjection(List<FieldReferenceExpression> projectedFields, >> > > DataType >> > > > > producedDataType) >> > > > > >> > > > > This will need a FLIP. >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jiangjie (Becket) Qin >> > > > > >> > > > > On Tue, Aug 1, 2023 at 11:42 PM Venkatakrishnan Sowrirajan < >> > > > > vsowr...@asu.edu> >> > > > > wrote: >> > > > > >> > > > > > Thanks for the response. Looking forward to your pointers. In >> the >> > > > > > meanwhile, let me figure out how we can implement it. Will keep >> you >> > > > > posted. >> > > > > > >> > > > > > On Mon, Jul 31, 2023, 11:43 PM liu ron <ron9....@gmail.com> >> wrote: >> > > > > > >> > > > > > > Hi, Venkata >> > > > > > > >> > > > > > > Thanks for reporting this issue. Currently, Flink doesn't >> support >> > > > > nested >> > > > > > > filter pushdown. I also think that this optimization would be >> > > useful, >> > > > > > > especially for jobs, which may need to read a lot of data from >> > the >> > > > > > parquet >> > > > > > > or orc file. We didn't move forward with this for some >> priority >> > > > > reasons. >> > > > > > > >> > > > > > > Regarding your three questions, I will respond to you later >> after >> > > my >> > > > > > > on-call is finished because I need to dive into the source >> code. >> > > > About >> > > > > > your >> > > > > > > commit, I don't think it's the right solution because >> > > > > > > FieldReferenceExpression doesn't currently support nested >> field >> > > > filter >> > > > > > > pushdown, maybe we need to extend it. >> > > > > > > >> > > > > > > You can also look further into reasonable solutions, which >> we'll >> > > > > discuss >> > > > > > > further later on. >> > > > > > > >> > > > > > > Best, >> > > > > > > Ron >> > > > > > > >> > > > > > > >> > > > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2023年7月29日周六 >> > > 03:31写道: >> > > > > > > >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > Currently, I am working on adding support for nested fields >> > > filter >> > > > > push >> > > > > > > > down. In our use case running Flink on Batch, we found >> nested >> > > > fields >> > > > > > > filter >> > > > > > > > push down is key - without it, it is significantly slow. >> Note: >> > > > Spark >> > > > > > SQL >> > > > > > > > supports nested fields filter push down. >> > > > > > > > >> > > > > > > > While debugging the code using IcebergTableSource as the >> table >> > > > > source, >> > > > > > > > narrowed down the issue to missing support for >> > > > > > > > >> RexNodeExtractor#RexNodeToExpressionConverter#visitFieldAccess. >> > > > > > > > As part of fixing it, I made changes by returning an >> > > > > > > > Option(FieldReferenceExpression) >> > > > > > > > with appropriate reference to the parent index and the child >> > > index >> > > > > for >> > > > > > > the >> > > > > > > > nested field with the data type info. >> > > > > > > > >> > > > > > > > But this new ResolvedExpression cannot be converted to >> RexNode >> > > > which >> > > > > > > > happens in PushFilterIntoSourceScanRuleBase >> > > > > > > > < >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://github.com/apache/flink/blob/3f63e03e83144e9857834f8db1895637d2aa218a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java*L104__;Iw!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9Q5cMnVs$ >> > > > > > > > > >> > > > > > > > . >> > > > > > > > >> > > > > > > > Few questions >> > > > > > > > >> > > > > > > > 1. Does FieldReferenceExpression support nested fields >> > currently >> > > or >> > > > > > > should >> > > > > > > > it be extended to support nested fields? I couldn't figure >> this >> > > out >> > > > > > from >> > > > > > > > the PushProjectIntoTableScanRule that supports nested column >> > > > > projection >> > > > > > > > push down. >> > > > > > > > 2. ExpressionConverter >> > > > > > > > < >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://github.com/apache/flink/blob/3f63e03e83144e9857834f8db1895637d2aa218a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java*L197__;Iw!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9Z6jnkJm$ >> > > > > > > > > >> > > > > > > > converts ResolvedExpression -> RexNode but the new >> > > > > > > FieldReferenceExpression >> > > > > > > > with the nested field cannot be converted to RexNode. This >> is >> > why >> > > > the >> > > > > > > > answer to the 1st question is key. >> > > > > > > > 3. Anything else that I'm missing here? or is there an even >> > > easier >> > > > > way >> > > > > > to >> > > > > > > > add support for nested fields filter push down? >> > > > > > > > >> > > > > > > > Partially working changes - Commit >> > > > > > > > < >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://urldefense.com/v3/__https://github.com/venkata91/flink/commit/00cdf34ecf9be3ba669a97baaed4b69b85cd26f9__;!!IKRxdwAv5BmarQ!fNgxcul8ZGwkNE9ygOeVGlWlU6m_MLMXf4A3S3oQu9LBzYTPF90pZ7uXSGMr-5dFmzRn37-e9XeOjJ_a$ >> > > > > > > > > >> > > > > > > > Please >> > > > > > > > feel free to leave a comment directly in the commit. >> > > > > > > > >> > > > > > > > Any pointers here would be much appreciated! Thanks in >> advance. >> > > > > > > > >> > > > > > > > Disclaimer: Relatively new to Flink code base especially >> Table >> > > > > planner >> > > > > > > :-). >> > > > > > > > >> > > > > > > > Regards >> > > > > > > > Venkata krishnan >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >