Hi, Oh, right.. got it. Thanks!
Br, Henkka On Tue, Feb 6, 2018 at 5:01 PM, Fabian Hueske <fhue...@apache.org> wrote: > Hi Henkka, > > This should be fairly easy to implement in a ProcessFunction. > You're making a good call to worry about the number of timers. If you > register a timer multiple times on the same time, the timer is deduplicated > ;-) and will only fire once for that time. > That's why the state retention time allows to set a min and max timer. > With that, you only have to set a timer every (max - min) interval. For > example, if you say, the application should keep the state at least for 12 > hours but the most for 14 hours, you only need to register a new timer > every 2 hours. > > Hope this helps, > Fabian > > 2018-02-06 15:47 GMT+01:00 Henri Heiskanen <henri.heiska...@gmail.com>: > >> Hi, >> >> Thanks. >> >> Doing this deduplication would be easy just by using vanilla flink api >> and state (check if this is a new key and then emit), but the issue has >> been automatic state cleanup. However, it looks like this streaming sql >> retention time implementation uses the process function and timer. I was a >> bit reluctant to use that because I was worried that the approach would be >> overkill with our volumes, but maybe it will work just fine. Can you help >> me a bit how to implement it efficiently? >> >> Basically we get estimated of 20M of distinct rows/key and roughly 300 >> events per key during one day. What I would like to do is to clear the >> state for specific key if I have not seen such key for last 12 hours. I >> think its very close to example here: https://ci.apache.org/pr >> ojects/flink/flink-docs-release-1.4/dev/stream/operators/ >> process_function.html. Instead of emitting the data onTimer I would just >> clear the state. In the example each tuple will invoke >> registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we >> could get hundreds of events with the same key during few minutes, so would >> we then register hundreds of timer instances? >> >> Br, >> Henkka >> >> On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fhue...@apache.org> wrote: >> >>> Hi Henri, >>> >>> thanks for reaching out and providing code and data to reproduce the >>> issue. >>> >>> I think you are right, a "SELECT DISTINCT a, b, c FROM X" should not >>> result in a retraction stream. >>> >>> However, with the current implementation we internally need a retraction >>> stream if a state retention time is configured. >>> The reason lies in how state retention time is defined: the state >>> retention time will remove the state for a key if it hasn't been seen for x >>> time. >>> This means that an operator resets a state clean-up timer of a key >>> whenever a new record with that key is received. This is also true for >>> retraction / insertion messages of the same record. >>> If we implement the GroupBy that performs the DISTINCT as an operator >>> that emits an append stream, all downstream operator won't see any updates >>> because the GroupBy only emits the first and filters out all duplicates. >>> Hence, downstream operators would perform a clean-up too early. >>> >>> I see that these are internals that users should not need to worry >>> about, but right now there is no easy solution to this. >>> Eventually, the clean-up timer reset should be differently implemented >>> than using retraction and insert of the same record. However, this would be >>> a more involved change and requires good planning. >>> >>> I'll file a JIRA for that. >>> >>> Thanks again for bringing the issue to our attention. >>> >>> Best, Fabian >>> >>> >>> 2018-02-06 13:59 GMT+01:00 Timo Walther <twal...@apache.org>: >>> >>>> Hi Henri, >>>> >>>> I just noticed that I had a tiny mistake in my little test program. So >>>> SELECT DISTINCT is officially supported. But the question if this is a >>>> valid append stream is still up for discussion. I will loop in Fabian (in >>>> CC). >>>> >>>> For the general behavior you can also look into the code and especially >>>> the comments there [1]. >>>> >>>> Regards, >>>> Timo >>>> >>>> [1] https://github.com/apache/flink/blob/master/flink-libraries/ >>>> flink-table/src/main/scala/org/apache/flink/table/runtime/ag >>>> gregate/GroupAggProcessFunction.scala >>>> >>>> >>>> Am 2/6/18 um 1:36 PM schrieb Timo Walther: >>>> >>>> Hi Henri, >>>> >>>> I try to answer your question: >>>> >>>> 1) You are right, SELECT DISTINCT should not need a retract stream. >>>> Internally, this is translated into an aggregation without an aggregate >>>> function call. So this definitely needs improvement. >>>> >>>> 2) The problem is that SELECT DISTINCT is not officially supported nor >>>> tested. I opened an issue for this [1]. >>>> >>>> Until this issue is fixed I would recommend to implement a custom >>>> aggregate function that keeps track values seen so far [2]. >>>> >>>> Regards, >>>> Timo >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-8564 >>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>>> dev/table/udfs.html#aggregation-functions >>>> >>>> >>>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen: >>>> >>>> Hi, >>>> >>>> I have a use case where I would like to find distinct rows over certain >>>> period of time. Requirement is that new row is emitted asap. Otherwise the >>>> requirement is mainly to just filter out data to have smaller dataset for >>>> downstream. I noticed that SELECT DISTINCT and state retention time of 12 >>>> hours would in theory do the trick. You can find the code below. Few >>>> questions. >>>> >>>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios >>>> we would get update/delete rows? >>>> >>>> 2) If I run the below code with the example data (also below) without >>>> state retention config I get the two append rows (expected). If I run >>>> exactly the code below (with the retention config) I'll get two appends and >>>> one delete for AN1234 and then one append for AN5555. What is going on? >>>> >>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >>>> ExecutionEnvironment(); >>>> >>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir >>>> onment(env); >>>> >>>> StreamQueryConfig qConfig = tableEnv.queryConfig(); >>>> // set idle state retention time. min = max = 12 hours >>>> qConfig.withIdleStateRetentionTime(Time.hours(12)); >>>> >>>> // create a TableSource >>>> CsvTableSource csvSource = CsvTableSource >>>> .builder() >>>> .path("data.csv") >>>> .field("ts", Types.SQL_TIMESTAMP()) >>>> .field("aid1", Types.STRING()) >>>> .field("aid2", Types.STRING()) >>>> .field("advertiser_id", Types.STRING()) >>>> .field("platform_id", Types.STRING()) >>>> .fieldDelimiter(",") >>>> .build(); >>>> >>>> tableEnv.registerTableSource("CsvTable", csvSource); >>>> >>>> Table result = tableEnv.sqlQuery( >>>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable"); >>>> >>>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new >>>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"}, >>>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(), >>>> Types.STRING()}); >>>> >>>> result.writeToSink(out, qConfig); >>>> >>>> env.execute(); >>>> >>>> >>>> Here is a simple csv dataset of three rows: >>>> >>>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1 >>>> 234,1234567890 >>>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1 >>>> 234,1234567890 >>>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1 >>>> 234,1234567891 >>>> >>>> >>>> >>>> >>> >> >