[ https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17078225#comment-17078225 ]
Fabian Hueske commented on FLINK-9528: -------------------------------------- I agree with [~twalthr]. I don't think we should close bug reports unless we know that the bug was fixed or is not relevant anymore (because a component was completely removed or ...). > Incorrect results: Filter does not treat Upsert messages correctly. > ------------------------------------------------------------------- > > Key: FLINK-9528 > URL: https://issues.apache.org/jira/browse/FLINK-9528 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.3.3, 1.4.2, 1.5.0 > Reporter: Fabian Hueske > Assignee: Hequn Cheng > Priority: Critical > > Currently, Filters (i.e., Calcs with predicates) do not distinguish between > retraction and upsert mode. A Calc looks at record (regardless of its update > semantics) and either discard it (predicate evaluates to false) or pass it on > (predicate evaluates to true). > This works fine for messages with retraction semantics but is not correct for > upsert messages. > The following test case (can be pasted into {{TableSinkITCase}}) shows the > problem: > {code:java} > @Test > def testUpsertsWithFilter(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.getConfig.enableObjectReuse() > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > val t = StreamTestData.get3TupleDataStream(env) > .assignAscendingTimestamps(_._1.toLong) > .toTable(tEnv, 'id, 'num, 'text) > t.select('text.charLength() as 'len) > .groupBy('len) > .select('len, 'len.count as 'cnt) > // .where('cnt < 7) > .writeToSink(new TestUpsertSink(Array("len"), false)) > env.execute() > val results = RowCollector.getAndClearValues > val retracted = RowCollector.upsertResults(results, Array(0)).sorted > val expectedWithoutFilter = List( > "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted > val expectedWithFilter = List( > "2,1", "5,1", "11,1", "14,1", "25,1").sorted > assertEquals(expectedWithoutFilter, retracted) > // assertEquals(expectedWithFilter, retracted) > } > {code} > When we add a filter on the aggregation result, we would expect that all rows > that do not fulfill the condition are removed from the result. However, the > filter only removes the upsert message such that the previous version remains > in the result. > One solution could be to make a filter aware of the update semantics (retract > or upsert) and convert the upsert message into a delete message if the > predicate evaluates to false. -- This message was sent by Atlassian Jira (v8.3.4#803005)