Another option would be add an extra field like:

    ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY *ts,
row_number*), ARRAY[-1]), -1) AS prev_zoneIds,

But it isn't possible either.

El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> I tried to create a additional field based on ROW_NUMBER but it can't be
> used in LAG function.
>
> The idea was:
> WITH ranked AS (
>     SELECT
>         msisdn,
>         eventTimestamp,
>         zoneIds,
>         ts,
>         TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY
> ts), 3) AS ts_ranked
>     FROM example_2
> )
> SELECT
>     msisdn,
>     eventTimestamp,
>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
> ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds,
>     ts
> FROM ranked;
>
> RANKED subselect:
>       msisdn       eventTimestamp                        zoneIds
>            ts              row_num               ts_ranked
>    673944959        1739996380000                            [1]
> 2025-02-19 21:19:40.000                    1 1970-01-01 01:00:00.001
>    673944959        1739996380000                            [1]
> 2025-02-19 21:19:40.000                    2 1970-01-01 01:00:00.002
>
> *But [ERROR] Could not execute SQL statement. Reason:*
> *org.apache.flink.table.api.TableException: OVER windows' ordering in
> stream mode must be defined on a time attribute. *
>
> I guess just could use "ts" field because if it's the temporal field?
>
>
> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (<
> alessio.berne...@gmail.com>) escribió:
>
>> Hello,
>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
>> TIMESTAMP ts i had some not understandable behaviors.
>>
>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as
>> expected.
>> So, first all, we changed all the parsing to TIMESTAMP to exclude this
>> kind of problems.
>>
>> If someone is interested i can provide the exact query to reproduce the
>> problem we figured out.
>>
>> Greetings,
>> Alessio
>>
>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
>> guillermo.ortiz.f...@gmail.com> wrote:
>>
>>> I have created a table that reads from a Kafka topic. What I want to do
>>> is order the data by eventTime and add a new field that represents the
>>> previous value using the LAG function.
>>>
>>> The problem arises when two records have exactly the same eventTime,
>>> which produces a "strange" behavior.
>>>
>>>
>>> CREATE TABLE example (
>>>     eventTimestamp BIGINT NOT NULL,
>>>     msisdn INT NOT NULL,
>>>     zoneIds ARRAY<INT NOT NULL> NOT NULL,
>>>     ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
>>>     `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
>>>     WATERMARK FOR ts AS ts
>>> ) WITH (
>>>     'connector' = 'kafka',
>>>     'topic' = 'example-offset',
>>>     'properties.bootstrap.servers' = 'xxxx',
>>>     'properties.auto.offset.reset' = 'latest',
>>>     'scan.startup.mode' = 'latest-offset',
>>>     'key.format' = 'raw',
>>>     'key.fields' = 'msisdn',
>>>     'value.format' = 'avro',
>>>     'value.fields-include' = 'ALL',
>>>     'scan.watermark.idle-timeout' = '1000',
>>>
>>> );
>>>
>>>
>>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
>>> (1739996380000, 673944959, ARRAY[1]),
>>> (1739996380000, 673944959, ARRAY[1]);
>>>
>>>
>>> SELECT
>>>     msisdn,
>>>     eventTimestamp,
>>>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
>>> ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>>     tsFROM example;
>>>
>>> *Actual Result:*
>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected
>>> Result:*
>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000
>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>>> ------------------------------
>>> *Is this behavior normal?*
>>>
>>> I am trying to achieve the expected behavior by including the metadata
>>> of the offset in the example table and adding it to the OVER clause in
>>> the LAG function. However, it seems that Flink does not allow ordering
>>> by more than one column:
>>>
>>>
>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, 
>>> kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>>
>>> Results in:
>>>
>>> [ERROR] Could not execute SQL statement. Reason:
>>> org.apache.flink.table.api.TableException: The window can only be ordered 
>>> by a single time column.
>>>
>>> ------------------------------
>>>
>>> Would you happen to know how to achieve the expected result?
>>>
>>

Reply via email to