matriv opened a new pull request #17962:
URL: https://github.com/apache/flink/pull/17962


   Implement the  missing column uniqueness checking for TableSourceTable
   to avoid losing uniqueness information during join operations.
   
   ## What is the purpose of the change
   
   Implement the  missing column uniqueness checking for TableSourceTable
   to avoid losing uniqueness information during join operations.
   
   For:
   ```
   CREATE TEMPORARY TABLE passengers (
     passenger_key STRING,
     PRIMARY KEY (passenger_key) NOT ENFORCED
   ) WITH (
     'connector' = 'values'
   )
   
   CREATE TEMPORARY TABLE booking_channels (
     booking_channel_key STRING,
     PRIMARY KEY (booking_channel_key) NOT ENFORCED
   ) WITH (
     'connector' = 'values'
   )
   
   CREATE TEMPORARY TABLE train_activities (
     passenger_key STRING,
     booking_channel_key STRING,
     PRIMARY KEY (booking_channel_key) NOT ENFORCED
   ) WITH (
     'connector' = 'values'
   )
   
   SELECT t.booking_channel_key as booking_channel_key, t.passenger_key as 
passenger_key, b.booking_channel_key as booking_channel_key_0
   FROM train_activities t
   LEFT JOIN booking_channels b
   ON t.booking_channel_key = b.booking_channel_key
   LEFT JOIN passengers p
   on t.passenger_key = p.passenger_key
   ```
   
   Previously:
   ```
   == Optimized Physical Plan ==
   Calc(select=[booking_channel_key, passenger_key])
   +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, passenger_key0)], 
select=[passenger_key, booking_channel_key, passenger_key0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[passenger_key]])
      :  +- Calc(select=[passenger_key, booking_channel_key])
      :     +- Join(joinType=[LeftOuterJoin], where=[=(booking_channel_key, 
booking_channel_key0)], select=[passenger_key, booking_channel_key, 
booking_channel_key0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
      :        :- Exchange(distribution=[hash[booking_channel_key]])
      :        :  +- TableSourceScan(table=[[default_catalog, default_database, 
train_activities]], fields=[passenger_key, booking_channel_key])
      :        +- Exchange(distribution=[hash[booking_channel_key]])
      :           +- TableSourceScan(table=[[default_catalog, default_database, 
booking_channels]], fields=[booking_channel_key])
      +- Exchange(distribution=[hash[passenger_key]])
         +- TableSourceScan(table=[[default_catalog, default_database, 
passengers]], fields=[passenger_key])
   ```
   
   Now:
   ```
   == Optimized Physical Plan ==
   Calc(select=[booking_channel_key, passenger_key, booking_channel_key0 AS 
booking_channel_key_0])
   +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, passenger_key0)], 
select=[passenger_key, booking_channel_key, booking_channel_key0, 
passenger_key0], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[passenger_key]])
      :  +- Join(joinType=[LeftOuterJoin], where=[=(booking_channel_key, 
booking_channel_key0)], select=[passenger_key, booking_channel_key, 
booking_channel_key0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
      :     :- Exchange(distribution=[hash[booking_channel_key]])
      :     :  +- TableSourceScan(table=[[default_catalog, default_database, 
train_activities]], fields=[passenger_key, booking_channel_key])
      :     +- Exchange(distribution=[hash[booking_channel_key]])
      :        +- TableSourceScan(table=[[default_catalog, default_database, 
booking_channels]], fields=[booking_channel_key])
      +- Exchange(distribution=[hash[passenger_key]])
         +- TableSourceScan(table=[[default_catalog, default_database, 
passengers]], fields=[passenger_key])
   ```
   
   The difference is that we select booking_channel_key from both left and 
right table in the first case and only booking_channel_key from left table in 
the second case. The result is one has uniqueKey and the other doesn't.
   t.booking_channel_key, t.passenger_key is the unique key of the first join 
output, but previously this info was lost.
   
   ## Brief change log
   
    - Implement the column uniqueness checking for TableSourceTable
    - Add some test cases of uniqueness checking and getUniqueKey for 
TableSourceTable
   
   
   ## Verifying this change
   
    - Added tests in FlinkRelMdColumnUniquenessTest
    - Added tests in FlinkRelMdUniqueKeysTest
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to