Hi,

Oh, right.. got it. Thanks!

Br,
Henkka

On Tue, Feb 6, 2018 at 5:01 PM, Fabian Hueske <fhue...@apache.org> wrote:

> Hi Henkka,
>
> This should be fairly easy to implement in a ProcessFunction.
> You're making a good call to worry about the number of timers. If you
> register a timer multiple times on the same time, the timer is deduplicated
> ;-) and will only fire once for that time.
> That's why the state retention time allows to set a min and max timer.
> With that, you only have to set a timer every (max - min) interval. For
> example, if you say, the application should keep the state at least for 12
> hours but the most for 14 hours, you only need to register a new timer
> every 2 hours.
>
> Hope this helps,
> Fabian
>
> 2018-02-06 15:47 GMT+01:00 Henri Heiskanen <henri.heiska...@gmail.com>:
>
>> Hi,
>>
>> Thanks.
>>
>> Doing this deduplication would be easy just by using vanilla flink api
>> and state (check if this is a new key and then emit), but the issue has
>> been automatic state cleanup. However, it looks like this streaming sql
>> retention time implementation uses the process function and timer. I was a
>> bit reluctant to use that because I was worried that the approach would be
>> overkill with our volumes, but maybe it will work just fine. Can you help
>> me a bit how to implement it efficiently?
>>
>> Basically we get estimated of 20M of distinct rows/key and roughly 300
>> events per key during one day. What I would like to do is to clear the
>> state for specific key if I have not seen such key for last 12 hours. I
>> think its very close to example here: https://ci.apache.org/pr
>> ojects/flink/flink-docs-release-1.4/dev/stream/operators/
>> process_function.html. Instead of emitting the data onTimer I would just
>> clear the state. In the example each tuple will invoke
>> registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we
>> could get hundreds of events with the same key during few minutes, so would
>> we then register hundreds of timer instances?
>>
>> Br,
>> Henkka
>>
>> On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fhue...@apache.org> wrote:
>>
>>> Hi Henri,
>>>
>>> thanks for reaching out and providing code and data to reproduce the
>>> issue.
>>>
>>> I think you are right, a "SELECT DISTINCT a, b, c FROM X"  should not
>>> result in a retraction stream.
>>>
>>> However, with the current implementation we internally need a retraction
>>> stream if a state retention time is configured.
>>> The reason lies in how state retention time is defined: the state
>>> retention time will remove the state for a key if it hasn't been seen for x
>>> time.
>>> This means that an operator resets a state clean-up timer of a key
>>> whenever a new record with that key is received. This is also true for
>>> retraction / insertion messages of the same record.
>>> If we implement the GroupBy that performs the DISTINCT as an operator
>>> that emits an append stream, all downstream operator won't see any updates
>>> because the GroupBy only emits the first and filters out all duplicates.
>>> Hence, downstream operators would perform a clean-up too early.
>>>
>>> I see that these are internals that users should not need to worry
>>> about, but right now there is no easy solution to this.
>>> Eventually, the clean-up timer reset should be differently implemented
>>> than using retraction and insert of the same record. However, this would be
>>> a more involved change and requires good planning.
>>>
>>> I'll file a JIRA for that.
>>>
>>> Thanks again for bringing the issue to our attention.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-02-06 13:59 GMT+01:00 Timo Walther <twal...@apache.org>:
>>>
>>>> Hi Henri,
>>>>
>>>> I just noticed that I had a tiny mistake in my little test program. So
>>>> SELECT DISTINCT is officially supported. But the question if this is a
>>>> valid append stream is still up for discussion. I will loop in Fabian (in
>>>> CC).
>>>>
>>>> For the general behavior you can also look into the code and especially
>>>> the comments there [1].
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1] https://github.com/apache/flink/blob/master/flink-libraries/
>>>> flink-table/src/main/scala/org/apache/flink/table/runtime/ag
>>>> gregate/GroupAggProcessFunction.scala
>>>>
>>>>
>>>> Am 2/6/18 um 1:36 PM schrieb Timo Walther:
>>>>
>>>> Hi Henri,
>>>>
>>>> I try to answer your question:
>>>>
>>>> 1) You are right, SELECT DISTINCT should not need a retract stream.
>>>> Internally, this is translated into an aggregation without an aggregate
>>>> function call. So this definitely needs improvement.
>>>>
>>>> 2) The problem is that SELECT DISTINCT is not officially supported nor
>>>> tested. I opened an issue for this [1].
>>>>
>>>> Until this issue is fixed I would recommend to implement a custom
>>>> aggregate function that keeps track values seen so far [2].
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-8564
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> dev/table/udfs.html#aggregation-functions
>>>>
>>>>
>>>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen:
>>>>
>>>> Hi,
>>>>
>>>> I have a use case where I would like to find distinct rows over certain
>>>> period of time. Requirement is that new row is emitted asap. Otherwise the
>>>> requirement is mainly to just filter out data to have smaller dataset for
>>>> downstream. I noticed that SELECT DISTINCT and state retention time of 12
>>>> hours would in theory do the trick. You can find the code below. Few
>>>> questions.
>>>>
>>>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios
>>>> we would get update/delete rows?
>>>>
>>>> 2) If I run the below code with the example data (also below) without
>>>> state retention config I get the two append rows (expected). If I run
>>>> exactly the code below (with the retention config) I'll get two appends and
>>>> one delete for AN1234 and then one append for AN5555. What is going on?
>>>>
>>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>> ExecutionEnvironment();
>>>>
>>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>>>> onment(env);
>>>>
>>>> StreamQueryConfig qConfig = tableEnv.queryConfig();
>>>> // set idle state retention time. min = max = 12 hours
>>>> qConfig.withIdleStateRetentionTime(Time.hours(12));
>>>>
>>>> // create a TableSource
>>>> CsvTableSource csvSource = CsvTableSource
>>>> .builder()
>>>> .path("data.csv")
>>>> .field("ts", Types.SQL_TIMESTAMP())
>>>> .field("aid1", Types.STRING())
>>>> .field("aid2", Types.STRING())
>>>> .field("advertiser_id", Types.STRING())
>>>> .field("platform_id", Types.STRING())
>>>> .fieldDelimiter(",")
>>>> .build();
>>>>
>>>> tableEnv.registerTableSource("CsvTable", csvSource);
>>>>
>>>> Table result = tableEnv.sqlQuery(
>>>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable");
>>>>
>>>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new
>>>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"},
>>>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(),
>>>> Types.STRING()});
>>>>
>>>> result.writeToSink(out, qConfig);
>>>>
>>>> env.execute();
>>>>
>>>>
>>>> Here is a simple csv dataset of three rows:
>>>>
>>>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>>> 234,1234567890
>>>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1
>>>> 234,1234567890
>>>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1
>>>> 234,1234567891
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to