Hi Henri, thanks for reaching out and providing code and data to reproduce the issue.
I think you are right, a "SELECT DISTINCT a, b, c FROM X" should not result in a retraction stream. However, with the current implementation we internally need a retraction stream if a state retention time is configured. The reason lies in how state retention time is defined: the state retention time will remove the state for a key if it hasn't been seen for x time. This means that an operator resets a state clean-up timer of a key whenever a new record with that key is received. This is also true for retraction / insertion messages of the same record. If we implement the GroupBy that performs the DISTINCT as an operator that emits an append stream, all downstream operator won't see any updates because the GroupBy only emits the first and filters out all duplicates. Hence, downstream operators would perform a clean-up too early. I see that these are internals that users should not need to worry about, but right now there is no easy solution to this. Eventually, the clean-up timer reset should be differently implemented than using retraction and insert of the same record. However, this would be a more involved change and requires good planning. I'll file a JIRA for that. Thanks again for bringing the issue to our attention. Best, Fabian 2018-02-06 13:59 GMT+01:00 Timo Walther <twal...@apache.org>: > Hi Henri, > > I just noticed that I had a tiny mistake in my little test program. So > SELECT DISTINCT is officially supported. But the question if this is a > valid append stream is still up for discussion. I will loop in Fabian (in > CC). > > For the general behavior you can also look into the code and especially > the comments there [1]. > > Regards, > Timo > > [1] https://github.com/apache/flink/blob/master/flink- > libraries/flink-table/src/main/scala/org/apache/flink/ > table/runtime/aggregate/GroupAggProcessFunction.scala > > > Am 2/6/18 um 1:36 PM schrieb Timo Walther: > > Hi Henri, > > I try to answer your question: > > 1) You are right, SELECT DISTINCT should not need a retract stream. > Internally, this is translated into an aggregation without an aggregate > function call. So this definitely needs improvement. > > 2) The problem is that SELECT DISTINCT is not officially supported nor > tested. I opened an issue for this [1]. > > Until this issue is fixed I would recommend to implement a custom > aggregate function that keeps track values seen so far [2]. > > Regards, > Timo > > [1] https://issues.apache.org/jira/browse/FLINK-8564 > [2] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/table/udfs.html#aggregation-functions > > > Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen: > > Hi, > > I have a use case where I would like to find distinct rows over certain > period of time. Requirement is that new row is emitted asap. Otherwise the > requirement is mainly to just filter out data to have smaller dataset for > downstream. I noticed that SELECT DISTINCT and state retention time of 12 > hours would in theory do the trick. You can find the code below. Few > questions. > > 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios we > would get update/delete rows? > > 2) If I run the below code with the example data (also below) without > state retention config I get the two append rows (expected). If I run > exactly the code below (with the retention config) I'll get two appends and > one delete for AN1234 and then one append for AN5555. What is going on? > > StreamExecutionEnvironment env = StreamExecutionEnvironment. > getExecutionEnvironment(); > > StreamTableEnvironment tableEnv = TableEnvironment. > getTableEnvironment(env); > > StreamQueryConfig qConfig = tableEnv.queryConfig(); > // set idle state retention time. min = max = 12 hours > qConfig.withIdleStateRetentionTime(Time.hours(12)); > > // create a TableSource > CsvTableSource csvSource = CsvTableSource > .builder() > .path("data.csv") > .field("ts", Types.SQL_TIMESTAMP()) > .field("aid1", Types.STRING()) > .field("aid2", Types.STRING()) > .field("advertiser_id", Types.STRING()) > .field("platform_id", Types.STRING()) > .fieldDelimiter(",") > .build(); > > tableEnv.registerTableSource("CsvTable", csvSource); > > Table result = tableEnv.sqlQuery( > "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable"); > > StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new > String[] {"aid1", "aid2", "advertiser_id", "platform_id"}, > new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(), > Types.STRING()}); > > result.writeToSink(out, qConfig); > > env.execute(); > > > Here is a simple csv dataset of three rows: > > 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234- > 1234-1234,1234567890 > 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234- > 1234-1234,1234567890 > 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234- > 1234-1234,1234567891 > > > >