[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eduardo Breijo updated FLINK-36626: ----------------------------------- Summary: Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.20 (was: Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.20+) > Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.20 > ----------------------------------------------------------------------------- > > Key: FLINK-36626 > URL: https://issues.apache.org/jira/browse/FLINK-36626 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.18.1, 1.20.0 > Environment: AWS Managed Apache Flink > Reporter: Eduardo Breijo > Priority: Critical > Attachments: Tasks Execution Plan.txt > > > There is a behavior change I found when migrating to Flink 1.18+ from Flink > 1.15 in regards to Flink SQL temporal lookup joins that I haven't been able > to pin point and is causing the query below to output different results. > *Flink SQL Query:* > ~WITH assets_setpoint AS (~ > ~SELECT~ > ~asset_id,~ > ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ > ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ > ~LAST_VALUE(`value`) AS `value`~ > ~FROM asset_readings~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ > ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ > ~),~ > ~assets_supply_air_temp AS (~ > ~-- CAST needed to perform regular joins instead of temporal joins in the > outer query~ > ~SELECT CAST(asset_readings.`timestamp` AS TIMESTAMP) AS `timestamp`,~ > ~asset_readings.asset_id,~ > ~asset_readings.`value` AS `value`~ > ~FROM asset_readings~ > ~-- Metrics temporal lookup inner join~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~-- Assets to ignore for this computed metric definition temporal lookup > left join~ > ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME > AS OF `proctime`~ > ~ON > asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id > = :computedMetricDefinitionId~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id = > asset_readings.asset_id~ > ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ > ~-- Filter assets not present in the asset to ignore for this computed > metric definition table~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ > ~)~ > ~SELECT~ > ~assets_supply_air_temp.`timestamp`,~ > ~assets_supply_air_temp.asset_id,~ > ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ > ~FROM assets_supply_air_temp~ > ~INNER JOIN assets_setpoint~ > ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ > ~AND assets_supply_air_temp.`timestamp` BETWEEN > assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ > > *Schema:* > ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~ > ~| name | type | null | key | extras | > watermark |~ > ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~ > ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | > SOURCE_WATERMARK() |~ > ~| asset_id | BIGINT | TRUE | | | > |~ > ~| metric_id | INT | TRUE | | | > |~ > ~| value | DOUBLE | TRUE | | | > |~ > ~| metadata | MAP<STRING, STRING> | TRUE | | | > |~ > ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | > |~ > ~{+}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}-------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}----------\{+}~ > ~6 rows in set~ > ~+------------------------------------------------+~ > ~| table name |~ > ~+------------------------------------------------+~ > ~| asset_readings |~ > ~| asset_relationship_parent_to_unit |~ > ~| asset_to_ignore_per_computed_metric_definition |~ > ~| metric |~ > ~+------------------------------------------------+~ > Results: > * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - > assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and > assets_setpoint is computed correctly for every value of the > assets_supply_air_temp (note that this subquery does not perform any > window-based grouping, so it is just raw data) > * On Flink 1.18+, for the same query, this difference always results in 0 > * On Flink 1.18+, updating the query to use regular join against the metric > lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the > query to output the correct value, however this causes a performance hit as > the assets_readings table is built from a Kinesis data stream and the metric > table can change over time. > * Please see the attached "Task Execution Plan.txt" file to see the > difference in temporal joins between Flink 1.15 and Flink 1.20 > > I have tried updating the query using different formats with temporal joins > but I have not found a workaround and I don't know why this is happening. > Attached you will find a file with the different SQL formats I have tried > with no luck. > Any help would be appreciated > -- This message was sent by Atlassian Jira (v8.20.10#820010)