I tried to create a additional field based on ROW_NUMBER but it can't be used in LAG function.
The idea was: WITH ranked AS ( SELECT msisdn, eventTimestamp, zoneIds, ts, TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY ts), 3) AS ts_ranked FROM example_2 ) SELECT msisdn, eventTimestamp, ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds, ts FROM ranked; RANKED subselect: msisdn eventTimestamp zoneIds ts row_num ts_ranked 673944959 1739996380000 [1] 2025-02-19 21:19:40.000 1 1970-01-01 01:00:00.001 673944959 1739996380000 [1] 2025-02-19 21:19:40.000 2 1970-01-01 01:00:00.002 *But [ERROR] Could not execute SQL statement. Reason:* *org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute. * I guess just could use "ts" field because if it's the temporal field? El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (< alessio.berne...@gmail.com>) escribió: > Hello, > don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. > TIMESTAMP ts i had some not understandable behaviors. > > Changing the ts to TIMESTAMP (not LTZ) the identical query works as > expected. > So, first all, we changed all the parsing to TIMESTAMP to exclude this > kind of problems. > > If someone is interested i can provide the exact query to reproduce the > problem we figured out. > > Greetings, > Alessio > > On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < > guillermo.ortiz.f...@gmail.com> wrote: > >> I have created a table that reads from a Kafka topic. What I want to do >> is order the data by eventTime and add a new field that represents the >> previous value using the LAG function. >> >> The problem arises when two records have exactly the same eventTime, >> which produces a "strange" behavior. >> >> >> CREATE TABLE example ( >> eventTimestamp BIGINT NOT NULL, >> msisdn INT NOT NULL, >> zoneIds ARRAY<INT NOT NULL> NOT NULL, >> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), >> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, >> WATERMARK FOR ts AS ts >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'example-offset', >> 'properties.bootstrap.servers' = 'xxxx', >> 'properties.auto.offset.reset' = 'latest', >> 'scan.startup.mode' = 'latest-offset', >> 'key.format' = 'raw', >> 'key.fields' = 'msisdn', >> 'value.format' = 'avro', >> 'value.fields-include' = 'ALL', >> 'scan.watermark.idle-timeout' = '1000', >> >> ); >> >> >> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES >> (1739996380000, 673944959, ARRAY[1]), >> (1739996380000, 673944959, ARRAY[1]); >> >> >> SELECT >> msisdn, >> eventTimestamp, >> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY >> ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >> tsFROM example; >> >> *Actual Result:* >> msisdneventTimestampzoneIdsprev_zoneIdsts >> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected Result:* >> msisdneventTimestampzoneIdsprev_zoneIdsts >> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000 >> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >> ------------------------------ >> *Is this behavior normal?* >> >> I am trying to achieve the expected behavior by including the metadata of >> the offset in the example table and adding it to the OVER clause in the >> LAG function. However, it seems that Flink does not allow ordering by >> more than one column: >> >> >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, >> kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >> >> Results in: >> >> [ERROR] Could not execute SQL statement. Reason: >> org.apache.flink.table.api.TableException: The window can only be ordered by >> a single time column. >> >> ------------------------------ >> >> Would you happen to know how to achieve the expected result? >> >