matriv opened a new pull request #17962: URL: https://github.com/apache/flink/pull/17962
Implement the missing column uniqueness checking for TableSourceTable to avoid losing uniqueness information during join operations. ## What is the purpose of the change Implement the missing column uniqueness checking for TableSourceTable to avoid losing uniqueness information during join operations. For: ``` CREATE TEMPORARY TABLE passengers ( passenger_key STRING, PRIMARY KEY (passenger_key) NOT ENFORCED ) WITH ( 'connector' = 'values' ) CREATE TEMPORARY TABLE booking_channels ( booking_channel_key STRING, PRIMARY KEY (booking_channel_key) NOT ENFORCED ) WITH ( 'connector' = 'values' ) CREATE TEMPORARY TABLE train_activities ( passenger_key STRING, booking_channel_key STRING, PRIMARY KEY (booking_channel_key) NOT ENFORCED ) WITH ( 'connector' = 'values' ) SELECT t.booking_channel_key as booking_channel_key, t.passenger_key as passenger_key, b.booking_channel_key as booking_channel_key_0 FROM train_activities t LEFT JOIN booking_channels b ON t.booking_channel_key = b.booking_channel_key LEFT JOIN passengers p on t.passenger_key = p.passenger_key ``` Previously: ``` == Optimized Physical Plan == Calc(select=[booking_channel_key, passenger_key]) +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, passenger_key0)], select=[passenger_key, booking_channel_key, passenger_key0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) :- Exchange(distribution=[hash[passenger_key]]) : +- Calc(select=[passenger_key, booking_channel_key]) : +- Join(joinType=[LeftOuterJoin], where=[=(booking_channel_key, booking_channel_key0)], select=[passenger_key, booking_channel_key, booking_channel_key0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) : :- Exchange(distribution=[hash[booking_channel_key]]) : : +- TableSourceScan(table=[[default_catalog, default_database, train_activities]], fields=[passenger_key, booking_channel_key]) : +- Exchange(distribution=[hash[booking_channel_key]]) : +- TableSourceScan(table=[[default_catalog, default_database, booking_channels]], fields=[booking_channel_key]) +- Exchange(distribution=[hash[passenger_key]]) +- TableSourceScan(table=[[default_catalog, default_database, passengers]], fields=[passenger_key]) ``` Now: ``` == Optimized Physical Plan == Calc(select=[booking_channel_key, passenger_key, booking_channel_key0 AS booking_channel_key_0]) +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, passenger_key0)], select=[passenger_key, booking_channel_key, booking_channel_key0, passenger_key0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) :- Exchange(distribution=[hash[passenger_key]]) : +- Join(joinType=[LeftOuterJoin], where=[=(booking_channel_key, booking_channel_key0)], select=[passenger_key, booking_channel_key, booking_channel_key0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) : :- Exchange(distribution=[hash[booking_channel_key]]) : : +- TableSourceScan(table=[[default_catalog, default_database, train_activities]], fields=[passenger_key, booking_channel_key]) : +- Exchange(distribution=[hash[booking_channel_key]]) : +- TableSourceScan(table=[[default_catalog, default_database, booking_channels]], fields=[booking_channel_key]) +- Exchange(distribution=[hash[passenger_key]]) +- TableSourceScan(table=[[default_catalog, default_database, passengers]], fields=[passenger_key]) ``` The difference is that we select booking_channel_key from both left and right table in the first case and only booking_channel_key from left table in the second case. The result is one has uniqueKey and the other doesn't. t.booking_channel_key, t.passenger_key is the unique key of the first join output, but previously this info was lost. ## Brief change log - Implement the column uniqueness checking for TableSourceTable - Add some test cases of uniqueness checking and getUniqueKey for TableSourceTable ## Verifying this change - Added tests in FlinkRelMdColumnUniquenessTest - Added tests in FlinkRelMdUniqueKeysTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org