Eduardo Breijo created FLINK-36626:
--------------------------------------
Summary: Flink SQL JOINs behavior change from Flink 1.15 to Flink
1.18+
Key: FLINK-36626
URL: https://issues.apache.org/jira/browse/FLINK-36626
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.20.0, 1.18.1
Environment: AWS Managed Apache Flink
Reporter: Eduardo Breijo
Attachments: Flink-SQL-query.txt
There is a behavior change I found when migrating to Flink 1.18+ from Flink
1.15 in regards to Flink SQL joins that I haven't been able to pin point and is
causing the query to output different results.
Flink SQL Query:
~WITH assets_setpoint AS (~
~SELECT~
~asset_id,~
~TUMBLE_START(`timestamp`, INTERVAL '1' MINUTES) AS start_timestamp,~
~TUMBLE_END(`timestamp`, INTERVAL '1' MINUTES) AS end_timestamp,~
~LAST_VALUE(`value`) AS `value`~
~FROM asset_readings~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~WHERE metric.alias IN ('Parameters.Main.SupplyAirTempSetpoint')~
~GROUP BY TUMBLE(`timestamp`, INTERVAL '1' MINUTES), asset_id~
~)~
~SELECT~
~assets_supply_air_temp.`timestamp`,~
~assets_supply_air_temp.asset_id,~
~assets_supply_air_temp.`value` - assets_setpoint.`value` AS `value`~
~FROM (~
~SELECT asset_readings.`timestamp`,~
~asset_readings.asset_id,~
~asset_readings.`value` AS `value`~
~FROM asset_readings~
~-- Metrics temporal lookup inner join~
~JOIN metric FOR SYSTEM_TIME AS OF `proctime`~
~ON metric.metric_id = asset_readings.metric_id~
~-- Assets to ignore for this computed metric definition temporal lookup
left join~
~LEFT JOIN asset_to_ignore_per_computed_metric_definition FOR SYSTEM_TIME
AS OF `proctime`~
~ON
asset_to_ignore_per_computed_metric_definition.computed_metric_definition_id =
:computedMetricDefinitionId~
~AND asset_to_ignore_per_computed_metric_definition.asset_id =
asset_readings.asset_id~
~WHERE metric.alias IN ('Sensors.InsideAir.Supply.Temp.SupplyAirTemp')~
~-- Filter assets not present in the asset to ignore for this computed
metric definition table~
~AND asset_to_ignore_per_computed_metric_definition.asset_id IS NULL~
~) AS assets_supply_air_temp~
~INNER JOIN assets_setpoint~
~ON assets_setpoint.asset_id = assets_supply_air_temp.asset_id~
~WHERE assets_supply_air_temp.`timestamp` BETWEEN
assets_setpoint.start_timestamp AND assets_setpoint.end_timestamp~
Results:
* On Flink 1.15 the difference ({~}assets_supply_air_temp.`value` -
assets_setpoint.`value` AS `value`{~}) of the assets_supply_air_temp and
assets_setpoint is computed correctly for every value of the
assets_supply_air_temp (note that this subquery does not perform any
window-based grouping, so it is just raw data)
* On Flink 1.18+ this difference always results in 0
I have tried updating the query using different formats but I have not found a
workaround and I don't know why this is happening. Attached you will find a
file with the different SQL formats I have tried with no luck.
Any help would be appreciated
--
This message was sent by Atlassian Jira
(v8.20.10#820010)