[ https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355002#comment-17355002 ]
Xu Guangheng commented on FLINK-22113: -------------------------------------- Hello [~godfreyhe] and [~jark], This issue has been labeled as stale-assigned. Can anyone help to review the PR:P? > UniqueKey constraint is lost with multiple sources join in SQL > -------------------------------------------------------------- > > Key: FLINK-22113 > URL: https://issues.apache.org/jira/browse/FLINK-22113 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.0 > Reporter: Fu Kai > Assignee: Xu Guangheng > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > Hi team, > > We have a use case to join multiple data sources to generate a continuous > updated view. We defined primary key constraint on all the input sources and > all the keys are the subsets in the join condition. All joins are left join. > > In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* > input sepc, which is good and performant. While when it comes to the third > input source, it's joined with the intermediate output table of the first two > input tables, and the intermediate table does not carry key constraint > information(although the thrid source input table does), so it results in a > *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance > implications per the[ Force Join Unique > Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651] > email thread, we want to know if there is any mitigation solution for this. > > Example: > Take the example from > [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md] > {code:java} > CREATE TEMPORARY TABLE passengers ( > passenger_key STRING, > first_name STRING, > last_name STRING, > update_time TIMESTAMP(3), > PRIMARY KEY (passenger_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'passengers', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE stations ( > station_key STRING, > update_time TIMESTAMP(3), > city STRING, > PRIMARY KEY (station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'stations', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE booking_channels ( > booking_channel_key STRING, > update_time TIMESTAMP(3), > channel STRING, > PRIMARY KEY (booking_channel_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'booking_channels', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE train_activities ( > scheduled_departure_time TIMESTAMP(3), > actual_departure_date TIMESTAMP(3), > passenger_key STRING, > origin_station_key STRING, > destination_station_key STRING, > booking_channel_key STRING, > PRIMARY KEY (booking_channel_key, origin_station_key, > destination_station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'train_activities', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'json', > 'value.format' = 'json' > ); > SELECT > t.actual_departure_date, > p.first_name, > p.last_name, > b.channel, > os.city AS origin_station, > ds.city AS destination_station > FROM train_activities_1 t > LEFT JOIN booking_channels b > ON t.booking_channel_key = b.booking_channel_key > LEFT JOIN passengers p > ON t.passenger_key = p.passenger_key > LEFT JOIN stations os > ON t.origin_station_key = os.station_key > LEFT JOIN stations ds > ON t.destination_station_key = ds.station_key > {code} > > The query will generate exeuction plan of: > > {code:java} > Flink SQL> explain > > SELECT > > t.actual_departure_date, > > p.first_name, > > p.last_name, > > b.channel, > > os.city AS origin_station, > > ds.city AS destination_station > > FROM train_activities_1 t > > LEFT JOIN booking_channels b > > ON t.booking_channel_key = b.booking_channel_key > > LEFT JOIN passengers p > > ON t.passenger_key = p.passenger_key > > LEFT JOIN stations os > > ON t.origin_station_key = os.station_key > > LEFT JOIN stations ds > > ON t.destination_station_key = ds.station_key; > == Abstract Syntax Tree == > LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], > channel=[$8], origin_station=[$15], destination_station=[$18]) > +- LogicalJoin(condition=[=($4, $16)], joinType=[left]) > :- LogicalJoin(condition=[=($3, $13)], joinType=[left]) > : :- LogicalJoin(condition=[=($2, $9)], joinType=[left]) > : : :- LogicalJoin(condition=[=($5, $6)], joinType=[left]) > : : : :- LogicalTableScan(table=[[default_catalog, default_database, > train_activities_1]]) > : : : +- LogicalWatermarkAssigner(rowtime=[update_time], > watermark=[-($1, 10000:INTERVAL SECOND)]) > : : : +- LogicalTableScan(table=[[default_catalog, default_database, > booking_channels]]) > : : +- LogicalTableScan(table=[[default_catalog, default_database, > passengers]]) > : +- LogicalTableScan(table=[[default_catalog, default_database, > stations]]) > +- LogicalTableScan(table=[[default_catalog, default_database, stations]]) > == Optimized Physical Plan == > Calc(select=[actual_departure_date, first_name, last_name, channel, city AS > origin_station, city0 AS destination_station]) > +- Join(joinType=[LeftOuterJoin], where=[=(destination_station_key, > station_key)], select=[actual_departure_date, destination_station_key, > channel, first_name, last_name, city, station_key, city0], > leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > :- Exchange(distribution=[hash[destination_station_key]]) > : +- Calc(select=[actual_departure_date, destination_station_key, > channel, first_name, last_name, city]) > : +- Join(joinType=[LeftOuterJoin], where=[=(origin_station_key, > station_key)], select=[actual_departure_date, origin_station_key, > destination_station_key, channel, first_name, last_name, station_key, city], > leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > : :- Exchange(distribution=[hash[origin_station_key]]) > : : +- Calc(select=[actual_departure_date, origin_station_key, > destination_station_key, channel, first_name, last_name]) > : : +- Join(joinType=[LeftOuterJoin], where=[=(passenger_key, > passenger_key0)], select=[actual_departure_date, passenger_key, > origin_station_key, destination_station_key, channel, passenger_key0, > first_name, last_name], leftInputSpec=[NoUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > : : :- Exchange(distribution=[hash[passenger_key]]) > : : : +- Calc(select=[actual_departure_date, passenger_key, > origin_station_key, destination_station_key, channel]) > : : : +- Join(joinType=[LeftOuterJoin], > where=[=(booking_channel_key, booking_channel_key0)], > select=[actual_departure_date, passenger_key, origin_station_key, > destination_station_key, booking_channel_key, booking_channel_key0, channel], > leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > : : : :- > Exchange(distribution=[hash[booking_channel_key]]) > : : : : +- Calc(select=[actual_departure_date, > passenger_key, origin_station_key, destination_station_key, > booking_channel_key]) > : : : : +- > ChangelogNormalize(key=[booking_channel_key, origin_station_key, > destination_station_key]) > : : : : +- > Exchange(distribution=[hash[booking_channel_key, origin_station_key, > destination_station_key]]) > : : : : +- > TableSourceScan(table=[[default_catalog, default_database, > train_activities_1]], fields=[scheduled_departure_time, > actual_departure_date, passenger_key, origin_station_key, > destination_station_key, booking_channel_key]) > : : : +- > Exchange(distribution=[hash[booking_channel_key]]) > : : : +- Calc(select=[booking_channel_key, > channel]) > : : : +- > ChangelogNormalize(key=[booking_channel_key]) > : : : +- > Exchange(distribution=[hash[booking_channel_key]]) > : : : +- > TableSourceScan(table=[[default_catalog, default_database, booking_channels, > watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, > update_time, channel]) > : : +- Exchange(distribution=[hash[passenger_key]]) > : : +- Calc(select=[passenger_key, first_name, last_name]) > : : +- ChangelogNormalize(key=[passenger_key]) > : : +- Exchange(distribution=[hash[passenger_key]]) > : : +- TableSourceScan(table=[[default_catalog, > default_database, passengers]], fields=[passenger_key, first_name, last_name, > update_time]) > : +- Exchange(distribution=[hash[station_key]]) > : +- Calc(select=[station_key, city]) > : +- ChangelogNormalize(key=[station_key]) > : +- Exchange(distribution=[hash[station_key]]) > : +- TableSourceScan(table=[[default_catalog, > default_database, stations]], fields=[station_key, update_time, city]) > +- Exchange(distribution=[hash[station_key]]) > +- Calc(select=[station_key, city]) > +- ChangelogNormalize(key=[station_key]) > +- Exchange(distribution=[hash[station_key]]) > +- TableSourceScan(table=[[default_catalog, default_database, > stations]], fields=[station_key, update_time, city])== Optimized Execution > Plan == > Calc(select=[actual_departure_date, first_name, last_name, channel, city AS > origin_station, city0 AS destination_station]) > +- Join(joinType=[LeftOuterJoin], where=[(destination_station_key = > station_key)], select=[actual_departure_date, destination_station_key, > channel, first_name, last_name, city, station_key, city0], > leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > :- Exchange(distribution=[hash[destination_station_key]]) > : +- Calc(select=[actual_departure_date, destination_station_key, > channel, first_name, last_name, city]) > : +- Join(joinType=[LeftOuterJoin], where=[(origin_station_key = > station_key)], select=[actual_departure_date, origin_station_key, > destination_station_key, channel, first_name, last_name, station_key, city], > leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > : :- Exchange(distribution=[hash[origin_station_key]]) > : : +- Calc(select=[actual_departure_date, origin_station_key, > destination_station_key, channel, first_name, last_name]) > : : +- Join(joinType=[LeftOuterJoin], where=[(passenger_key = > passenger_key0)], select=[actual_departure_date, passenger_key, > origin_station_key, destination_station_key, channel, passenger_key0, > first_name, last_name], leftInputSpec=[NoUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > : : :- Exchange(distribution=[hash[passenger_key]]) > : : : +- Calc(select=[actual_departure_date, passenger_key, > origin_station_key, destination_station_key, channel]) > : : : +- Join(joinType=[LeftOuterJoin], > where=[(booking_channel_key = booking_channel_key0)], > select=[actual_departure_date, passenger_key, origin_station_key, > destination_station_key, booking_channel_key, booking_channel_key0, channel], > leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > : : : :- > Exchange(distribution=[hash[booking_channel_key]]) > : : : : +- Calc(select=[actual_departure_date, > passenger_key, origin_station_key, destination_station_key, > booking_channel_key]) > : : : : +- > ChangelogNormalize(key=[booking_channel_key, origin_station_key, > destination_station_key]) > : : : : +- > Exchange(distribution=[hash[booking_channel_key, origin_station_key, > destination_station_key]]) > : : : : +- > TableSourceScan(table=[[default_catalog, default_database, > train_activities_1]], fields=[scheduled_departure_time, > actual_departure_date, passenger_key, origin_station_key, > destination_station_key, booking_channel_key]) > : : : +- > Exchange(distribution=[hash[booking_channel_key]]) > : : : +- Calc(select=[booking_channel_key, > channel]) > : : : +- > ChangelogNormalize(key=[booking_channel_key]) > : : : +- > Exchange(distribution=[hash[booking_channel_key]]) > : : : +- > TableSourceScan(table=[[default_catalog, default_database, booking_channels, > watermark=[-($1, 10000:INTERVAL SECOND)]]], fields=[booking_channel_key, > update_time, channel]) > : : +- Exchange(distribution=[hash[passenger_key]]) > : : +- Calc(select=[passenger_key, first_name, last_name]) > : : +- ChangelogNormalize(key=[passenger_key]) > : : +- Exchange(distribution=[hash[passenger_key]]) > : : +- TableSourceScan(table=[[default_catalog, > default_database, passengers]], fields=[passenger_key, first_name, last_name, > update_time]) > : +- Exchange(distribution=[hash[station_key]])(reuse_id=[1]) > : +- Calc(select=[station_key, city]) > : +- ChangelogNormalize(key=[station_key]) > : +- Exchange(distribution=[hash[station_key]]) > : +- TableSourceScan(table=[[default_catalog, > default_database, stations]], fields=[station_key, update_time, city]) > +- Reused(reference_id=[1]) > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)