[ https://issues.apache.org/jira/browse/FLINK-26408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499315#comment-17499315 ]
Hengyu Dai commented on FLINK-26408: ------------------------------------ Hi, [~lzljs3620320], I find you have some research on this Exception before, could you give some help on this issue. cc [~jark] > retract a non-existent record in RetractableTopNFunction > --------------------------------------------------------- > > Key: FLINK-26408 > URL: https://issues.apache.org/jira/browse/FLINK-26408 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime > Affects Versions: 1.12.2 > Reporter: Hengyu Dai > Priority: Major > > RetractableTopNFunction will throw a RuntimeException when > # the sorted Map {color:#0747a6}ValueState<SortedMap<RowData, Long>> > treeMap{color} is not empty. > # and the sorted Map doesn't contains current sort key. > > Now we have Flink SQL job: > > {code:java} > // table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk) > select > a_key,a_time,a_jk,b_key,b_time,b_jk > from > ( > select > a_key,a_time,a_jk,b_key,b_time,b_jk, > row_number() over(partition by a_key order by a_time desc) as rn > from > ( > select a_key, a_time, a_jk > from ( > select * , row_number() over(partition by a_key order by a_time > desc) as rn > from table_a > ) tmp1 > where rn = 1 > ) t1 > left join > ( > select b_key, b_time, b_jk > from ( > select * , row_number() over(partition by b_key order by b_time > desc) as rn > from table_b > ) tmp2 > where rn = 1 > ) t2 > on t1.a_jk = t2.b_jk > ) t3 > where rn = 1{code} > the JobGraph is like: > {{Source table_a —> Rank_a}} > {{—> Join —> > Final Rank }} > {{Source table_b —> Rank_b}} > > > > Suppose we hava following input: > > # > ||ts||SourceA > (a_key, a_time,a_jk)||SourceB > (b_key,b_time,b_jk)||RankA > (a_key, a_time,a_jk)||RankB > (b_key,b_time,b_jk)||Join > (a_key,b_key,a_time, a_jk)||Final Rank > (a_key,b_key,a_time)|| > |t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | | > |t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | | > |t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)| > |t4|+(a1,4,jk1)| |-(a1,3,jk1) > +(a1,4,jk1)| |-(a1,b1,3,jk1) > +(a1,b1,4,jk1)|-(a1,b1,3) > +(a1,b1,4)| > |t5|+(a1,5,jk2)| |-(a1,4,jk1) > +(a1,5,jk2)| |-(a1,b1,4,jk1) > +(a1,b2,5,jk2)|-(a1,b1,4) > +(a1,b2,5)| > | | | | | | | | > Assume: > > # t4&t5 is almost at the same time, the Join Operator produce 4 message at > t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) may runs on > different task from the other 3 messages, and it may arrrive earlier than them > # Due to network congestion or high machine load, etc. the messages produced > by t4&t5 take a while before they arrive Final Rank, when Final Rank received > them, the state is expired because of State TTL. > Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key > a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it > will find that then sortedMap is not empty, and it doesn't contains sort key > value 3. meet the conditions for that Runtime Exception. > we met this exception in our production environment (Flink verision 1.12.2), > it's very serious because when is happens, the job can not recover > automatically as the state is polluted. > > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)