[ https://issues.apache.org/jira/browse/FLINK-36626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eduardo Breijo updated FLINK-36626: ----------------------------------- Description: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP<STRING, STRING> | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~ ~6 rows in set~ ~+------------------------------------------------+~ ~| table name |~ ~+------------------------------------------------+~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~+------------------------------------------------+~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+, for the same query, this difference always results in 0 * On Flink 1.18+, updating the query to use regular join against the metric lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct value but I don't think regular joins is what I need in this case. I have tried updating the query using different formats with temporal joins but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated was: There is a behavior change I found when migrating to Flink 1.18+ from Flink 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin point and is causing the query below to output different results. *Flink SQL Query:* ~WITH assets_setpoint AS (~ ~SELECT~ ~asset_id,~ ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ ~LAST_VALUE(`value`) AS `value`~ ~FROM asset_readings~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ ~)~ ~SELECT~ ~assets_supply_air_temp.`timestamp`,~ ~assets_supply_air_temp.asset_id,~ ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ ~FROM (~ ~SELECT asset_readings.`timestamp`,~ ~asset_readings.asset_id,~ ~asset_readings.`value` AS `value`~ ~FROM asset_readings~ ~-- Metrics temporal lookup inner join~ ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ ~ON metric.metric_id = asset_readings.metric_id~ ~-- Assets to ignore for this computed metric definition temporal lookup left join~ ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME AS OF `proctime`~ ~ON asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id = :computedMetricDefinitionId~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id = asset_readings.asset_id~ ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ ~-- Filter assets not present in the asset to ignore for this computed metric definition table~ ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ ~) AS assets_supply_air_temp~ ~INNER JOIN assets_setpoint~ ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ ~WHERE assets_supply_air_temp.`timestamp` BETWEEN assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ *Schema:* ~{+}-----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-----------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--------------\{+}~ ~| name | type | null | key | extras | watermark |~ ~{+}-----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-----------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--------------\{+}~ ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | SOURCE_WATERMARK() |~ ~| asset_id | BIGINT | TRUE | | | |~ ~| metric_id | INT | TRUE | | | |~ ~| value | DOUBLE | TRUE | | | |~ ~| metadata | MAP<STRING, STRING> | TRUE | | | |~ ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |~ ~{+}-----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{+}-----------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}-----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}--------------\{+}~ ~6 rows in set~ ~+------------------------------------------------+~ ~| table name |~ ~+------------------------------------------------+~ ~| asset_readings |~ ~| asset_relationship_parent_to_unit |~ ~| asset_to_ignore_per_computed_metric_definition |~ ~| metric |~ ~+------------------------------------------------+~ Results: * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and assets_setpoint is computed correctly for every value of the assets_supply_air_temp (note that this subquery does not perform any window-based grouping, so it is just raw data) * On Flink 1.18+, for the same query, this difference always results in 0 * On Flink 1.18+, updating the query to use regular join against the metric table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the query to output the correct value but I don't think regular joins is what I need in this case. I have tried updating the query using different formats with temporal joins but I have not found a workaround and I don't know why this is happening. Attached you will find a file with the different SQL formats I have tried with no luck. Any help would be appreciated > Flink SQL temporal lookup JOINs behavior change from Flink 1.15 to Flink 1.18+ > ------------------------------------------------------------------------------ > > Key: FLINK-36626 > URL: https://issues.apache.org/jira/browse/FLINK-36626 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.18.1, 1.20.0 > Environment: AWS Managed Apache Flink > Reporter: Eduardo Breijo > Priority: Critical > Attachments: Flink-SQL-query.txt > > > There is a behavior change I found when migrating to Flink 1.18+ from Flink > 1.15 in regards to Flink SQL temporal joins that I haven't been able to pin > point and is causing the query below to output different results. > *Flink SQL Query:* > ~WITH assets_setpoint AS (~ > ~SELECT~ > ~asset_id,~ > ~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~ > ~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~ > ~LAST_VALUE(`value`) AS `value`~ > ~FROM asset_readings~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~ > ~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~ > ~)~ > ~SELECT~ > ~assets_supply_air_temp.`timestamp`,~ > ~assets_supply_air_temp.asset_id,~ > ~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~ > ~FROM (~ > ~SELECT asset_readings.`timestamp`,~ > ~asset_readings.asset_id,~ > ~asset_readings.`value` AS `value`~ > ~FROM asset_readings~ > ~-- Metrics temporal lookup inner join~ > ~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~ > ~ON metric.metric_id = asset_readings.metric_id~ > ~-- Assets to ignore for this computed metric definition temporal lookup > left join~ > ~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME > AS OF `proctime`~ > ~ON > asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id > = :computedMetricDefinitionId~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id = > asset_readings.asset_id~ > ~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~ > ~-- Filter assets not present in the asset to ignore for this computed > metric definition table~ > ~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~ > ~) AS assets_supply_air_temp~ > ~INNER JOIN assets_setpoint~ > ~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~ > ~WHERE assets_supply_air_temp.`timestamp` BETWEEN > assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~ > *Schema:* > ~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~ > ~| name | type | null | key | extras | > watermark |~ > ~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~ > ~| timestamp | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | > SOURCE_WATERMARK() |~ > ~| asset_id | BIGINT | TRUE | | | > |~ > ~| metric_id | INT | TRUE | | | > |~ > ~| value | DOUBLE | TRUE | | | > |~ > ~| metadata | MAP<STRING, STRING> | TRUE | | | > |~ > ~| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | > |~ > ~{+}----{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{-}{{-}}{-}{-}{-}{+}---------------{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{{-}}{{-}}{{-}}{-}{{-}}{{-}}{+}{{+}}{{-}}{{-}}{{-}}{{-}}{-}---{-}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{{-}}{-}-------------\{+}~ > ~6 rows in set~ > ~+------------------------------------------------+~ > ~| table name |~ > ~+------------------------------------------------+~ > ~| asset_readings |~ > ~| asset_relationship_parent_to_unit |~ > ~| asset_to_ignore_per_computed_metric_definition |~ > ~| metric |~ > ~+------------------------------------------------+~ > Results: > * On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` - > assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and > assets_setpoint is computed correctly for every value of the > assets_supply_air_temp (note that this subquery does not perform any > window-based grouping, so it is just raw data) > * On Flink 1.18+, for the same query, this difference always results in 0 > * On Flink 1.18+, updating the query to use regular join against the metric > lookup table (removing {~}FOR SYSTEM_TIME AS OF `proctime`{~}) makes the > query to output the correct value but I don't think regular joins is what I > need in this case. > > I have tried updating the query using different formats with temporal joins > but I have not found a workaround and I don't know why this is happening. > Attached you will find a file with the different SQL formats I have tried > with no luck. > Any help would be appreciated > -- This message was sent by Atlassian Jira (v8.20.10#820010)