[ https://issues.apache.org/jira/browse/FLINK-26408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17514079#comment-17514079 ]
Martijn Visser commented on FLINK-26408: ---------------------------------------- I'm downgrading the priority from a Blocker to Major in accordance with https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process > retract a non-existent record in RetractableTopNFunction > --------------------------------------------------------- > > Key: FLINK-26408 > URL: https://issues.apache.org/jira/browse/FLINK-26408 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime > Affects Versions: 1.12.2 > Reporter: Hengyu Dai > Priority: Blocker > > RetractableTopNFunction will throw a RuntimeException when > # the sorted Map {color:#0747a6}ValueState<SortedMap<RowData, Long>> > treeMap{color} is not empty. > # and the sorted Map doesn't contain current sort key. > Now we have Flink SQL job: > {code:java} > // table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk) > select > a_key,a_time,a_jk,b_key,b_time,b_jk > from > ( > select > a_key,a_time,a_jk,b_key,b_time,b_jk, > row_number() over(partition by a_key order by a_time desc) as rn > from > ( > select a_key, a_time, a_jk > from ( > select * , row_number() over(partition by a_key order by a_time > desc) as rn > from table_a > ) tmp1 > where rn = 1 > ) t1 > left join > ( > select b_key, b_time, b_jk > from ( > select * , row_number() over(partition by b_key order by b_time > desc) as rn > from table_b > ) tmp2 > where rn = 1 > ) t2 > on t1.a_jk = t2.b_jk > ) t3 > where rn = 1{code} > the JobGraph is like: > {{Source table_a —> Rank_a}} > {{—> Join —> > Final Rank }} > {{Source table_b —> Rank_b}} > Suppose we hava following input: > ||ts||SourceA > (a_key, a_time,a_jk)||SourceB > (b_key,b_time,b_jk)||RankA > (a_key, a_time,a_jk)||RankB > (b_key,b_time,b_jk)||Join > (a_key,b_key,a_time, a_jk)||Final Rank > (a_key,b_key,a_time)|| > |t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | | > |t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | | > |t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)| > |t4|+(a1,4,jk1)| |-(a1,3,jk1) > +(a1,4,jk1)| |-(a1,b1,3,jk1) > +(a1,b1,4,jk1)|-(a1,b1,3) > +(a1,b1,4)| > |t5|+(a1,5,jk2)| |-(a1,4,jk1) > +(a1,5,jk2)| |-(a1,b1,4,jk1) > +(a1,b2,5,jk2)|-(a1,b1,4) > +(a1,b2,5)| > | | | | | | | | > > Assume: > # t4&t5 is almost at the same time, the Join Operator produce 4 message at > t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) (which hashed > with jk2) may runs on different task from other 3 messages(hashed with jk1), > and it may arrive Final Rank earlier than them. > # Due to network congestion or high machine load, etc. the messages produced > at t4&t5 on Join Operator take a while before they arrive Final Rank, when > Final Rank received them, the state is expired because of State TTL, the > treeMap state is cleared. > Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key > a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it > will find that the sortedMap is not empty, and it doesn't contains sort key > value 3. meet the conditions for that Runtime Exception. > we met this exception in our production environment (Flink verision 1.12.2), > it's very serious because when it happens, the job can not recover > automatically as the state is polluted. -- This message was sent by Atlassian Jira (v8.20.1#820001)