And another option I tried it's:

WITH ranked AS (
  select *
  FROM (
    SELECT
        msisdn,
        eventTimestamp,
        zoneIds,
        ts,
        ROW_NUMBER() OVER (PARTITION BY msisdn,ts ORDER BY ts) AS rownum
    FROM example)
  WHERE rownum = 1
)
SELECT
    msisdn,
    eventTimestamp,
    ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
    ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
ts), ARRAY[-1]), -1) AS prev_zoneIds,
    ts
FROM ranked;

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate
doesn't support consuming update and delete changes which is produced by
node Deduplicate(keep=[FirstRow], key=[msisdn, ts], order=[ROWTIME])

El jue, 20 feb 2025 a las 12:00, Guillermo Ortiz Fernández (<
guillermo.ortiz.f...@gmail.com>) escribió:

> Another option would be add an extra field like:
>
>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
> *ts,
> row_number*), ARRAY[-1]), -1) AS prev_zoneIds,
>
> But it isn't possible either.
>
> El jue, 20 feb 2025 a las 11:48, Guillermo Ortiz Fernández (<
> guillermo.ortiz.f...@gmail.com>) escribió:
>
>> I tried to create a additional field based on ROW_NUMBER but it can't be
>> used in LAG function.
>>
>> The idea was:
>> WITH ranked AS (
>>     SELECT
>>         msisdn,
>>         eventTimestamp,
>>         zoneIds,
>>         ts,
>>         TO_TIMESTAMP_LTZ(ROW_NUMBER() OVER (PARTITION BY msisdn ORDER BY
>> ts), 3) AS ts_ranked
>>     FROM example_2
>> )
>> SELECT
>>     msisdn,
>>     eventTimestamp,
>>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER
>> BY ts_ranked), ARRAY[-1]), -1) AS prev_zoneIds,
>>     ts
>> FROM ranked;
>>
>> RANKED subselect:
>>       msisdn       eventTimestamp                        zoneIds
>>              ts              row_num               ts_ranked
>>    673944959        1739996380000                            [1]
>> 2025-02-19 21:19:40.000                    1 1970-01-01 01:00:00.001
>>    673944959        1739996380000                            [1]
>> 2025-02-19 21:19:40.000                    2 1970-01-01 01:00:00.002
>>
>> *But [ERROR] Could not execute SQL statement. Reason:*
>> *org.apache.flink.table.api.TableException: OVER windows' ordering in
>> stream mode must be defined on a time attribute. *
>>
>> I guess just could use "ts" field because if it's the temporal field?
>>
>>
>> El jue, 20 feb 2025 a las 10:41, Alessio Bernesco Làvore (<
>> alessio.berne...@gmail.com>) escribió:
>>
>>> Hello,
>>> don't know if it's strictly related, but using a TIMESTAMP_LTZ vs.
>>> TIMESTAMP ts i had some not understandable behaviors.
>>>
>>> Changing the ts to TIMESTAMP (not LTZ) the identical query works as
>>> expected.
>>> So, first all, we changed all the parsing to TIMESTAMP to exclude this
>>> kind of problems.
>>>
>>> If someone is interested i can provide the exact query to reproduce the
>>> problem we figured out.
>>>
>>> Greetings,
>>> Alessio
>>>
>>> On Thu, Feb 20, 2025 at 10:31 AM Guillermo Ortiz Fernández <
>>> guillermo.ortiz.f...@gmail.com> wrote:
>>>
>>>> I have created a table that reads from a Kafka topic. What I want to do
>>>> is order the data by eventTime and add a new field that represents the
>>>> previous value using the LAG function.
>>>>
>>>> The problem arises when two records have exactly the same eventTime,
>>>> which produces a "strange" behavior.
>>>>
>>>>
>>>> CREATE TABLE example (
>>>>     eventTimestamp BIGINT NOT NULL,
>>>>     msisdn INT NOT NULL,
>>>>     zoneIds ARRAY<INT NOT NULL> NOT NULL,
>>>>     ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
>>>>     `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
>>>>     WATERMARK FOR ts AS ts
>>>> ) WITH (
>>>>     'connector' = 'kafka',
>>>>     'topic' = 'example-offset',
>>>>     'properties.bootstrap.servers' = 'xxxx',
>>>>     'properties.auto.offset.reset' = 'latest',
>>>>     'scan.startup.mode' = 'latest-offset',
>>>>     'key.format' = 'raw',
>>>>     'key.fields' = 'msisdn',
>>>>     'value.format' = 'avro',
>>>>     'value.fields-include' = 'ALL',
>>>>     'scan.watermark.idle-timeout' = '1000',
>>>>
>>>> );
>>>>
>>>>
>>>> INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
>>>> (1739996380000, 673944959, ARRAY[1]),
>>>> (1739996380000, 673944959, ARRAY[1]);
>>>>
>>>>
>>>> SELECT
>>>>     msisdn,
>>>>     eventTimestamp,
>>>>     ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
>>>>     ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY 
>>>> ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>>>     tsFROM example;
>>>>
>>>> *Actual Result:*
>>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected
>>>> Result:*
>>>> msisdneventTimestampzoneIdsprev_zoneIdsts
>>>> 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000
>>>> 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
>>>> ------------------------------
>>>> *Is this behavior normal?*
>>>>
>>>> I am trying to achieve the expected behavior by including the metadata
>>>> of the offset in the example table and adding it to the OVER clause in
>>>> the LAG function. However, it seems that Flink does not allow ordering
>>>> by more than one column:
>>>>
>>>>
>>>> ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, 
>>>> kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
>>>>
>>>> Results in:
>>>>
>>>> [ERROR] Could not execute SQL statement. Reason:
>>>> org.apache.flink.table.api.TableException: The window can only be ordered 
>>>> by a single time column.
>>>>
>>>> ------------------------------
>>>>
>>>> Would you happen to know how to achieve the expected result?
>>>>
>>>

Reply via email to