And another option I tried it's: WITH ranked AS ( select * FROM ( SELECT msisdn, eventTimestamp, zoneIds, ts, ROW_NUMBER() OVER (PARTITION BY msisdn,ts ORDER BY ts) AS rownum FROM example) WHERE rownum = 1 ) SELECT msisdn, eventTimestamp, ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts), ARRAY[-1]), -1) AS prev_zoneIds, ts FROM ranked;
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[msisdn, ts], order=[ROWTIME]) El jue, 20 feb 2025 a las 12:00, Guillermo Ortiz Fernández (< guillermo.ortiz.f...@gmail.com>) escribió: > Another option would be add an extra field like: > > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY > *ts, > row_number*), ARRAY[-1]), -1) AS prev_zoneIds, > > But it isn't possible either. > > El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (< > guillermo.ortiz.f...@gmail.com>) escribió: > >> I tried to create a additional field based on ROW_NUMBER but it can't be >> used in LAG function. >> >> The idea was: >> WITH ranked AS ( >> SELECT >> msisdn, >> eventTimestamp, >> zoneIds, >> ts, >> TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY >> ts), 3) AS ts_ranked >> FROM example_2 >> ) >> SELECT >> msisdn, >> eventTimestamp, >> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER >> BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds, >> ts >> FROM ranked; >> >> RANKED subselect: >> msisdn eventTimestamp zoneIds >> ts row_num ts_ranked >> 673944959 1739996380000 [1] >> 2025-02-19 21:19:40.000 1 1970-01-01 01:00:00.001 >> 673944959 1739996380000 [1] >> 2025-02-19 21:19:40.000 2 1970-01-01 01:00:00.002 >> >> *But [ERROR] Could not execute SQL statement. Reason:* >> *org.apache.flink.table.api.TableException: OVER windows' ordering in >> stream mode must be defined on a time attribute. * >> >> I guess just could use "ts" field because if it's the temporal field? >> >> >> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (< >> alessio.berne...@gmail.com>) escribió: >> >>> Hello, >>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. >>> TIMESTAMP ts i had some not understandable behaviors. >>> >>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as >>> expected. >>> So, first all, we changed all the parsing to TIMESTAMP to exclude this >>> kind of problems. >>> >>> If someone is interested i can provide the exact query to reproduce the >>> problem we figured out. >>> >>> Greetings, >>> Alessio >>> >>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < >>> guillermo.ortiz.f...@gmail.com> wrote: >>> >>>> I have created a table that reads from a Kafka topic. What I want to do >>>> is order the data by eventTime and add a new field that represents the >>>> previous value using the LAG function. >>>> >>>> The problem arises when two records have exactly the same eventTime, >>>> which produces a "strange" behavior. >>>> >>>> >>>> CREATE TABLE example ( >>>> eventTimestamp BIGINT NOT NULL, >>>> msisdn INT NOT NULL, >>>> zoneIds ARRAY<INT NOT NULL> NOT NULL, >>>> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), >>>> `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, >>>> WATERMARK FOR ts AS ts >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'example-offset', >>>> 'properties.bootstrap.servers' = 'xxxx', >>>> 'properties.auto.offset.reset' = 'latest', >>>> 'scan.startup.mode' = 'latest-offset', >>>> 'key.format' = 'raw', >>>> 'key.fields' = 'msisdn', >>>> 'value.format' = 'avro', >>>> 'value.fields-include' = 'ALL', >>>> 'scan.watermark.idle-timeout' = '1000', >>>> >>>> ); >>>> >>>> >>>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES >>>> (1739996380000, 673944959, ARRAY[1]), >>>> (1739996380000, 673944959, ARRAY[1]); >>>> >>>> >>>> SELECT >>>> msisdn, >>>> eventTimestamp, >>>> ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, >>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY >>>> ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>>> tsFROM example; >>>> >>>> *Actual Result:* >>>> msisdneventTimestampzoneIdsprev_zoneIdsts >>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected >>>> Result:* >>>> msisdneventTimestampzoneIdsprev_zoneIdsts >>>> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000 >>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 >>>> ------------------------------ >>>> *Is this behavior normal?* >>>> >>>> I am trying to achieve the expected behavior by including the metadata >>>> of the offset in the example table and adding it to the OVER clause in >>>> the LAG function. However, it seems that Flink does not allow ordering >>>> by more than one column: >>>> >>>> >>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, >>>> kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, >>>> >>>> Results in: >>>> >>>> [ERROR] Could not execute SQL statement. Reason: >>>> org.apache.flink.table.api.TableException: The window can only be ordered >>>> by a single time column. >>>> >>>> ------------------------------ >>>> >>>> Would you happen to know how to achieve the expected result? >>>> >>>