zlzhang0122 created FLINK-33657: ----------------------------------- Summary: Insert message in top n without row number didn't consider it's number and may not correct Key: FLINK-33657 URL: https://issues.apache.org/jira/browse/FLINK-33657 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.17.1, 1.16.2 Reporter: zlzhang0122
The new insert message in top n without row number didn't consider it's order and just collectInsert to the next operator, this may not correct when the next operator collect all the top n records and aggregate it. For example: create table user_info( user_id int, item_id int, app string, dt timestamp ) whith( 'connector'='kafka', ... ); create table redis_sink ( redis_key string, hash_key string, hash_value string ) with ( 'connector' = 'redis', 'command' = 'hmset' 'nodes' = 'xxx', 'additional-ttl' = 'xx' ); create view base_lastn as select * from( select user_id,item_id,app,dt,row_number() over(partition by item_id, app order by dt desc) as rn from user_action )t where rn<=5; insert into redis_sink select concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from base_lastn where rn=1; insert into redis_sink select concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as hash_value from base_lastn where group by item_id, app; There will be a scene that the value in the top 1 will not appear in the first or last value of the top 5. -- This message was sent by Atlassian Jira (v8.20.10#820010)