[ 
https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eduardo Breijo updated FLINK-36626:
-----------------------------------
    Summary: Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to 
Flink 1.20  (was: Flink SQL temporal lookup JOINs behavior change from Flink 
1.15 to Flink 1.20+)

> Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.20
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-36626
>                 URL: https://issues.apache.org/jira/browse/FLINK-36626
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.18.1, 1.20.0
>         Environment: AWS Managed Apache Flink 
>            Reporter: Eduardo Breijo
>            Priority: Critical
>         Attachments: Tasks Execution Plan.txt
>
>
> There is a behavior change I found when migrating to Flink 1.18+ from Flink 
> 1.15 in regards to Flink SQL temporal lookup joins that I haven't been able 
> to pin point and is causing the query below to output different results.
> *Flink SQL Query:*
> ~WITH assets_setpoint AS (~
>   ~SELECT~
>     ~asset_id,~
>     ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
>     ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
>     ~LAST_VALUE(`value`) AS `value`~
>   ~FROM asset_readings~
>   ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>   ~ON metric.metric_id = asset_readings.metric_id~
>   ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
>   ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
> ~),~
> ~assets_supply_air_temp AS (~
>   ~-- CAST needed to perform regular joins instead of temporal joins in the 
> outer query~
>   ~SELECT CAST(asset_readings.`timestamp` AS TIMESTAMP) AS `timestamp`,~
>       ~asset_readings.asset_id,~
>       ~asset_readings.`value` AS `value`~
>   ~FROM asset_readings~
>   ~-- Metrics temporal lookup inner join~
>   ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
>   ~ON metric.metric_id = asset_readings.metric_id~
>   ~-- Assets to ignore for this computed metric definition temporal lookup 
> left join~
>   ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME 
> AS OF `proctime`~
>   ~ON 
> asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id 
> = :computedMetricDefinitionId~
>   ~AND asset_to_ignore_per_computed_metric_definition.asset_id = 
> asset_readings.asset_id~
>   ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
>   ~-- Filter assets not present in the asset to ignore for this computed 
> metric definition table~
>   ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
> ~)~
> ~SELECT~
> ~assets_supply_air_temp.`timestamp`,~
> ~assets_supply_air_temp.asset_id,~
> ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
> ~FROM assets_supply_air_temp~
> ~INNER JOIN assets_setpoint~
> ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
> ~AND assets_supply_air_temp.`timestamp` BETWEEN 
> assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
>  
> *Schema:*
> ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~
> ~|      name |                        type |  null | key |        extras |    
>       watermark |~
> ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~
> ~| timestamp |  TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |               | 
> SOURCE_WATERMARK() |~
> ~|  asset_id |                      BIGINT |  TRUE |     |               |    
>                 |~
> ~| metric_id |                         INT |  TRUE |     |               |    
>                 |~
> ~|     value |                      DOUBLE |  TRUE |     |               |    
>                 |~
> ~|  metadata |         MAP<STRING, STRING> |  TRUE |     |               |    
>                 |~
> ~|  proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |    
>                 |~
> ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~
> ~6 rows in set~
> ~+------------------------------------------------+~
> ~|                                     table name |~
> ~+------------------------------------------------+~
> ~|                                 asset_readings |~
> ~|              asset_relationship_parent_to_unit |~
> ~| asset_to_ignore_per_computed_metric_definition |~
> ~|                                         metric |~
> ~+------------------------------------------------+~
> Results:
>  * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - 
> assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and 
> assets_setpoint is computed correctly for every value of the 
> assets_supply_air_temp (note that this subquery does not perform any 
> window-based grouping, so it is just raw data)
>  * On Flink 1.18+, for the same query, this difference always results in 0
>  * On Flink 1.18+, updating the query to use regular join against the metric 
> lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the 
> query to output the correct value, however this causes a performance hit as 
> the assets_readings table is built from a Kinesis data stream and the metric 
> table can change over time.
>  * Please see the attached "Task Execution Plan.txt" file to see the 
> difference in temporal joins between Flink 1.15 and Flink 1.20
>  
> I have tried updating the query using different formats with temporal joins 
> but I have not found a workaround and I don't know why this is happening. 
> Attached you will find a file with the different SQL formats I have tried 
> with no luck.
> Any help would be appreciated
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to