ahshahid opened a new pull request, #49117:
URL: https://github.com/apache/spark/pull/49117

   … rule
   ### What changes were proposed in this pull request?
   This PR proposes new algorithm to create & store the constraints.
   It tracks aliases in projection which eliminates the need of pessimistically 
generating all the permutations of a given constraint. It is also more 
effective in correctly identifying the filters which can be pruned , apart from 
minimizing the memory used as compared to the current code. This also has 
changes to push compound filters if the join condition is on multiple 
attributes and the constraint comprises of more than 1 attributes of the join 
conditions.
   
   Presently I have kept the code which retains the old logic of constraint 
management along with the new logic. It is controlled by the sql conf property 
spark.sql.optimizer.optimizedConstraintPropagation.enabled which is by default 
true. Once the PR is approved it would make sense to remove the old code & 
merge the code of ConstraintSet into ExpressionSet and removing some if else 
blocks in the Optimizer & the function Optimizer.getAllConstraints and 
LogicalPlan.getAllValidConstraints.
   
   The new logic is as follows:
   In the class ConstraintSet which extends ExpressionSet, we track the aliases 
, along with the base constraint.
   Any constraint which is added to the ConstraintSet is stored in the most 
canonicalized form (i.e consisting of only those attributes which are part of 
the output set and NOT the Alias's attribute).
   
   for eg consider a hypothetical plan
   >            Filter( z > 10 && a1 + b2 > 10)
                                    |
   >         Projection1 ( a, a as a1, a as a2, b , b as b1, b as b2, c, a +b 
as z)
                                    |
    >                 Filter ( a + b > 10)
                                    |
    >              base relation (a, b, c, d)
   
   At the node Filter the constraint set will just have constraint a + b > 10
   At the Node Projection1 , the constraint set will have
   constraint a + b > 10
   and maintain following buffers
   buff1 -> a , a1.attribute, a2. attribute
   buff2 -> b, b1.attribute, b2.attribute
   buff3 -> a + b, z.attribute
   
   constraint a + b > 10 is already canonicalized in terms of output attributes.
   
   Now there are two filters on top of projection1
   Filter( z > 10) and Filter ( a1 + b2 > 10)
   
   To prune the above two filters, we canonicalize z as a + b ( from the data 
maintained in the ConstraintSet) & check if the underlying set contains a +b > 
10 & so can be pruned.
   For Filter a1 + b2 > 10, we identify the buffer to which a1 & b2 belong to 
and replace it with 0th elements of the buffer, which will yield a +b > 10, and 
so filter can be pruned.
   
   Now suppose there is another Project2 ( a1, a2, b1, b2, z, c)
   i.e say attributes a & b are no longer part of OutputSet.
   
   such that the plan looks like:
   >         Projection2 ( a1,  a2,  b1,  b2, c,  z)
                                   |
   >            Filter( z > 10 && a1 + b2 > 10)
                                    |
   >         Projection1 ( a, a as a1, a as a2, b , b as b1, b as b2, c, a +b 
as z)
                                    |
    >                 Filter ( a + b > 10)
                                    |
    >              base relation (a, b, c, d)
   
   **The idea is that "as much as possible try to make a constraint survive.**
   
   So in Project2 , the atttributes a & b are being eliminated.
   we have a constraint a + b > 10 which is dependent on it.
   so in the ConstraintSet of the ProjectP2, we update it such that
   constraint a + b > 10 becomes ----> a1 + b1 > 10
   **buff1   ->  a , a1, a2   will become --> a1, a2
   buff2  ->  b , b1, b2.  will become  --> b1, b2
   buff3   ->  a +b , z  will become  -->. a1 + b1 , z**
   
   This way by tracking aliases & just storing the canonicalized base 
constraints we can eliminate the need of pessimistically generating all 
combination of constraints.
   
   **This PR also eliminates the need of EqualNullSafe constraints for the 
alias.
   It also is able to handle the literal boolean constraints.**
   
   _**For inferring new Filter from constraints**_
   we use following logic
   New Filter = Filter.constraints -- ( Filter.child.constraints ++ 
Filter.constraints.convertToCanonicalizedIfRequired(Filter.conditions) )
   So the idea is that new filter conditions without redundancy can be obtained 
by difference of current node's constraints & the child node's constraints & 
the condition itself properly canonicalized in terms of base attributes which 
will be part of the output set of filter node.
   
   _**For inferring new filters for Join push down,**_
    we identify all the equality conditions & then the attributes are 
segregated on the lines of LHS & RHS of joins. So to identify filters for push 
down on RHS side, we get all equality atttributes of LHS side & ask the 
ConstraintSet to return all the constraints which are subset of the passed LHS 
attributes. The LHS attributes are appropriately canonicalized & the 
ConstraintSet identified.
   Once the constraints are know, we can replace the attributes with the 
corresponding RHS attributes. This helps in identifying the compound filters 
for push down & not just single attribute filters.
   
   _**Below is a description of the changes proposed.**_
   
   ConstraintSet: This is the class which does the tracking of the aliases , 
stores the constraints in the canonicalized form, updates the constraints using 
available aliases if any of the attribute comprising constraint is getting 
eliminated. The contains method of this class is used for filter pruning. It 
also identifies those constraints which can generated new filters for push down 
in join nodes.
   Rest all the changes are just to integrate the new logic as well as retain 
the old constraints logic.
   Pls notice that related to tpcds plan stability , I had to add new golden 
files for q75. The change as such is trivial.
   previously pushed filter was generated as
   PushedFilters: [IsNotNull(cr_order_number), IsNotNull(cr_item_sk)]
   and with the change it is
   PushedFilters: [IsNotNull(cr_item_sk), IsNotNull(cr_order_number)]
   
   ### Why are the changes needed?
   
   1. This issue if not fixed can cause OutOfMemory issue or unacceptable query 
compilation times.
   Added a test **"plan equivalence with case statements and performance 
comparison with benefit of more than 10x conservatively"  in  
org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite**. With 
this PR the compilation _**time is 247 ms vs 13958 ms without the change**_
   2. It is more effective in filter pruning as is evident in some of the tests 
in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite 
where current code is not able to identify the redundant filter in some cases.
   3. It is able to generate a better optimized plan for join queries as it can 
push compound predicates.
   4. The current logic can miss a lot of possible cases of removing redundant 
predicates, as it fails to take into account if same attribute or its aliases 
are repeated multiple times in a complex expression.
   5. There are cases where some of the optimizer rules involving removal of 
redundant predicates fail to remove on the basis of constraint data. In some 
cases the rule works, just by the virtue of previous rules helping it out to 
cover the inaccuracy. That the ConstraintPropagation rule & its function of 
removal of redundant filters & addition of new inferred filters is dependent on 
the working of some of the other unrelated previous optimizer rules is 
behaving, is indicative of issues.
   6. It does away with all the EqualNullSafe constraints as this logic does 
not need those constraints to be created.
   7.  There is atleast one test in existing **ConstraintPropagationSuite** 
which is missing a IsNotNull constraints because the code incorrectly generated 
a EqualsNullSafeConstraint instead of EqualTo constraint, when using the 
existing Constraints code.  With these changes, the test correctly creates an 
EqualTo constraint, resulting in an inferred IsNotNull constraint
   8. It does away with the current combinatorial  logic of evaluation all the 
constraints can cause compilation to run into hours or cause OOM. The number of 
constraints stored is exactly the same as the number of filters encountered
   
   
   Added a test suite **CompareNewAndOldConstraintsSuite** which when run on 
current master, will fail highlighting the issues with current master ( in 
terms of functionality) and also show the perf problem.
   
   ### Does this PR introduce any user-facing change?
   No
   
   ### How was this patch tested?
   Many new tests are added. All existing tests are passing cleanly.
   Code is functional in workday env. for 1.5 years  without any issue
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to