Hello, don't know if it's strictly related, but using a TIMESTAMP_LTZ vs. TIMESTAMP ts i had some not understandable behaviors.
Changing the ts to TIMESTAMP (not LTZ) the identical query works as expected. So, first all, we changed all the parsing to TIMESTAMP to exclude this kind of problems. If someone is interested i can provide the exact query to reproduce the problem we figured out. Greetings, Alessio On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández < guillermo.ortiz.f...@gmail.com> wrote: > I have created a table that reads from a Kafka topic. What I want to do is > order the data by eventTime and add a new field that represents the > previous value using the LAG function. > > The problem arises when two records have exactly the same eventTime, > which produces a "strange" behavior. > > > CREATE TABLE example ( > eventTimestamp BIGINT NOT NULL, > msisdn INT NOT NULL, > zoneIds ARRAY<INT NOT NULL> NOT NULL, > ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), > `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, > WATERMARK FOR ts AS ts > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'example-offset', > 'properties.bootstrap.servers' = 'xxxx', > 'properties.auto.offset.reset' = 'latest', > 'scan.startup.mode' = 'latest-offset', > 'key.format' = 'raw', > 'key.fields' = 'msisdn', > 'value.format' = 'avro', > 'value.fields-include' = 'ALL', > 'scan.watermark.idle-timeout' = '1000', > > ); > > > INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES > (1739996380000, 673944959, ARRAY[1]), > (1739996380000, 673944959, ARRAY[1]); > > > SELECT > msisdn, > eventTimestamp, > ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY > ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, > tsFROM example; > > *Actual Result:* > msisdneventTimestampzoneIdsprev_zoneIdsts > 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 > 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected Result:* > msisdneventTimestampzoneIdsprev_zoneIdsts > 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000 > 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 > ------------------------------ > *Is this behavior normal?* > > I am trying to achieve the expected behavior by including the metadata of > the offset in the example table and adding it to the OVER clause in the > LAG function. However, it seems that Flink does not allow ordering by > more than one column: > > > ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, > kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, > > Results in: > > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: The window can only be ordered by > a single time column. > > ------------------------------ > > Would you happen to know how to achieve the expected result? >