Another option would be add an extra field like: ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY *ts, row_number*), ARRAY[-1]), -1) AS prev_zoneIds,
But it isn't possible either. El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (< guillermo.ortiz.f...@gmail.com>) escribió: > I tried to create a additional field based on ROW_NUMBER but it can't be > used in LAG function. > > The idea was: > WITH ranked AS ( > SELECT > msisdn, > eventTimestamp, > zoneIds, > ts, > TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY > ts), 3) AS ts_ranked > FROM example_2 > ) > SELECT > msisdn, > eventTimestamp, > ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY > ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds, > ts > FROM ranked; > > RANKED subselect: > msisdn eventTimestamp zoneIds > ts row_num ts_ranked > 673944959 1739996380000 [1] > 2025-02-19 21:19:40.000 1 1970-01-01 01:00:00.001 > 673944959 1739996380000 [1] > 2025-02-19 21:19:40.000 2 1970-01-01 01:00:00.002 > > *But [ERROR] Could not execute SQL statement. Reason:* > *org.apache.flink.table.api.TableException: OVER windows' ordering in > stream mode must be defined on a time attribute. * > > I guess just could use "ts" field because if it's the temporal field? > > > El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (< > alessio.berne...@gmail.com>) escribió: > >> Hello, >> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. >> TIMESTAMP ts i had some not understandable behaviors. >> >> Changing the ts to TIMESTAMP (not LTZ) the identical query works as >> expected. >> So, first all, we changed all the parsing to TIMESTAMP to exclude this >> kind of problems. >> >> If someone is interested i can provide the exact query to reproduce the >> problem we figured out. >> >> Greetings, >> Alessio >> >> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < >> guillermo.ortiz.f...@gmail.com> wrote: >> >>> I have created a table that reads from a Kafka topic. What I want to do >>> is order the data by eventTime and add a new field that represents the >>> previous value using the LAG function. >>> >>> The problem arises when two records have exactly the same eventTime, >>> which produces a "strange" behavior. >>> >>> >>> CREATE TABLE example ( >>> eventTimestamp BIGINT NOT NULL, >>> msisdn INT NOT NULL, >>> zoneIds ARRAY<INT NOT NULL> NOT NULL, >>> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), >>> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, >>> WATERMARK FOR ts AS ts >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'example-offset', >>> 'properties.bootstrap.servers' = 'xxxx', >>> 'properties.auto.offset.reset' = 'latest', >>> 'scan.startup.mode' = 'latest-offset', >>> 'key.format' = 'raw', >>> 'key.fields' = 'msisdn', >>> 'value.format' = 'avro', >>> 'value.fields-include' = 'ALL', >>> 'scan.watermark.idle-timeout' = '1000', >>> >>> ); >>> >>> >>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES >>> (1739996380000, 673944959, ARRAY[1]), >>> (1739996380000, 673944959, ARRAY[1]); >>> >>> >>> SELECT >>> msisdn, >>> eventTimestamp, >>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY >>> ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>> tsFROM example; >>> >>> *Actual Result:* >>> msisdneventTimestampzoneIdsprev_zoneIdsts >>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected >>> Result:* >>> msisdneventTimestampzoneIdsprev_zoneIdsts >>> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000 >>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >>> ------------------------------ >>> *Is this behavior normal?* >>> >>> I am trying to achieve the expected behavior by including the metadata >>> of the offset in the example table and adding it to the OVER clause in >>> the LAG function. However, it seems that Flink does not allow ordering >>> by more than one column: >>> >>> >>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, >>> kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>> >>> Results in: >>> >>> [ERROR] Could not execute SQL statement. Reason: >>> org.apache.flink.table.api.TableException: The window can only be ordered >>> by a single time column. >>> >>> ------------------------------ >>> >>> Would you happen to know how to achieve the expected result? >>> >>