I tried to create a additional field based on ROW_NUMBER but it can't be
used in LAG function.

The idea was:
WITH ranked AS (
    SELECT
        msisdn,
        eventTimestamp,
        zoneIds,
        ts,
        TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY
ts), 3) AS ts_ranked
    FROM example_2
)
SELECT
    msisdn,
    eventTimestamp,
    ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
    ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds,
    ts
FROM ranked;

RANKED subselect:
      msisdn       eventTimestamp                        zoneIds
           ts              row_num               ts_ranked
   673944959        1739996380000                            [1] 2025-02-19
21:19:40.000                    1 1970-01-01 01:00:00.001
   673944959        1739996380000                            [1] 2025-02-19
21:19:40.000                    2 1970-01-01 01:00:00.002

*But [ERROR] Could not execute SQL statement. Reason:*
*org.apache.flink.table.api.TableException: OVER windows' ordering in
stream mode must be defined on a time attribute. *

I guess just could use "ts" field because if it's the temporal field?


El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (<
alessio.berne...@gmail.com>) escribió:

> Hello,
> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
> TIMESTAMP ts i had some not understandable behaviors.
>
> Changing the ts to TIMESTAMP (not LTZ) the identical query works as
> expected.
> So, first all, we changed all the parsing to TIMESTAMP to exclude this
> kind of problems.
>
> If someone is interested i can provide the exact query to reproduce the
> problem we figured out.
>
> Greetings,
> Alessio
>
> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
> guillermo.ortiz.f...@gmail.com> wrote:
>
>> I have created a table that reads from a Kafka topic. What I want to do
>> is order the data by eventTime and add a new field that represents the
>> previous value using the LAG function.
>>
>> The problem arises when two records have exactly the same eventTime,
>> which produces a "strange" behavior.
>>
>>
>> CREATE TABLE example (
>>     eventTimestamp BIGINT NOT NULL,
>>     msisdn INT NOT NULL,
>>     zoneIds ARRAY<INT NOT NULL> NOT NULL,
>>     ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
>>     `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
>>     WATERMARK FOR ts AS ts
>> ) WITH (
>>     'connector' = 'kafka',
>>     'topic' = 'example-offset',
>>     'properties.bootstrap.servers' = 'xxxx',
>>     'properties.auto.offset.reset' = 'latest',
>>     'scan.startup.mode' = 'latest-offset',
>>     'key.format' = 'raw',
>>     'key.fields' = 'msisdn',
>>     'value.format' = 'avro',
>>     'value.fields-include' = 'ALL',
>>     'scan.watermark.idle-timeout' = '1000',
>>
>> );
>>
>>
>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
>> (1739996380000, 673944959, ARRAY[1]),
>> (1739996380000, 673944959, ARRAY[1]);
>>
>>
>> SELECT
>>     msisdn,
>>     eventTimestamp,
>>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
>> ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>     tsFROM example;
>>
>> *Actual Result:*
>> msisdneventTimestampzoneIdsprev_zoneIdsts
>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected Result:*
>> msisdneventTimestampzoneIdsprev_zoneIdsts
>> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000
>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>> ------------------------------
>> *Is this behavior normal?*
>>
>> I am trying to achieve the expected behavior by including the metadata of
>> the offset in the example table and adding it to the OVER clause in the
>> LAG function. However, it seems that Flink does not allow ordering by
>> more than one column:
>>
>>
>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, 
>> kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>
>> Results in:
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.TableException: The window can only be ordered by 
>> a single time column.
>>
>> ------------------------------
>>
>> Would you happen to know how to achieve the expected result?
>>
>

Reply via email to