[ https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young updated FLINK-24704: ------------------------------- Priority: Critical (was: Minor) > Exception occurs when the input record loses monotonicity on the sort key > field of UpdatableTopNFunction > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-24704 > URL: https://issues.apache.org/jira/browse/FLINK-24704 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.14.0 > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > An IllegalArgumentException occurred when the input retract record's sort key > is lower than old sort key, this's because it breaks the monotonicity on sort > key field which is guaranteed by the sql semantic. It's highly possible > upstream stateful operator has shorter state ttl than the stream records is > that cause the staled record cleared by state ttl. > A reproduce case like below: > {code:java|title=RankHarnessTest.java|borderStyle=solid} > }} > val sql = > """ > |SELECT word, cnt, rank_num > |FROM ( > | SELECT word, cnt, > | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num > | FROM ( > | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by > word, type > | ) > | ) > |WHERE rank_num <= 6 > """.stripMargin > {code} > when then aggregated result column `cnt` becomes lower for a key, then > downstream retract rank operator will fail on such exception: > > {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException > at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209) > {quote} > Here we should align with the RetractableTopNFunction, continue > processing(but incorrectly result) by default or can be configured to > failover after > [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed. > -- This message was sent by Atlassian Jira (v8.3.4#803005)