[ https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-24704: -------------------------------- Description: An IllegalArgumentException occurred when the input retract record's sort key is lower than old sort key, this's because it breaks the monotonicity on sort key field which is guaranteed by the sql semantic. It's highly possible upstream stateful operator has shorter state ttl than the stream records is that cause the staled record cleared by state ttl. A reproduce case like below: {code:java|title=RankHarnessTest.java|borderStyle=solid} }} val sql = """ |SELECT word, cnt, rank_num |FROM ( | SELECT word, cnt, | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num | FROM ( | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word, type | ) | ) |WHERE rank_num <= 6 """.stripMargin {code} when then aggregated result column `cnt` becomes lower for a key, then downstream retract rank operator will fail on such exception: {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209) {quote} Here we should align with the RetractableTopNFunction, continue processing(but incorrectly result) by default or can be configured to failover after [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed. was: An IllegalArgumentException occurred when the input retract record's sort key is lower than old sort key, this's because it breaks the monotonicity on sort key field which is guaranteed by the sql semantic. It's highly possible upstream stateful operator has shorter state ttl than the stream records is that cause the staled record cleared by state ttl. A reproduce case like below: {{{code:title=RankHarnessTest.java|borderStyle=solid}}} val sql = """ |SELECT word, cnt, rank_num |FROM ( | SELECT word, cnt, | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num | FROM ( | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word, type | ) | ) |WHERE rank_num <= 6 """.stripMargin {code} when then aggregated result column `cnt` becomes lower for a key, then downstream retract rank operator will fail on such exception: {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209) {quote} Here we should align with the RetractableTopNFunction, continue processing(but incorrectly result) by default or can be configured to failover after Flink-24666 was addressed. > Exception occurs when the input record loses monotonicity on the sort key > field of UpdatableTopNFunction > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-24704 > URL: https://issues.apache.org/jira/browse/FLINK-24704 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.14.0 > Reporter: lincoln lee > Priority: Minor > Labels: pull-request-available > Fix For: 1.15.0 > > > An IllegalArgumentException occurred when the input retract record's sort key > is lower than old sort key, this's because it breaks the monotonicity on sort > key field which is guaranteed by the sql semantic. It's highly possible > upstream stateful operator has shorter state ttl than the stream records is > that cause the staled record cleared by state ttl. > A reproduce case like below: > {code:java|title=RankHarnessTest.java|borderStyle=solid} > }} > val sql = > """ > |SELECT word, cnt, rank_num > |FROM ( > | SELECT word, cnt, > | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num > | FROM ( > | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by > word, type > | ) > | ) > |WHERE rank_num <= 6 > """.stripMargin > {code} > when then aggregated result column `cnt` becomes lower for a key, then > downstream retract rank operator will fail on such exception: > > {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException > at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167) > at > org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209) > {quote} > Here we should align with the RetractableTopNFunction, continue > processing(but incorrectly result) by default or can be configured to > failover after > [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed. > -- This message was sent by Atlassian Jira (v8.3.4#803005)