For the above two non-window approaches, the second one achieves a better
performance. => For the above two non-window approaches, the second one
achieves a better performance  in most cases especially when there are many
same rows.

On Thu, Jun 28, 2018 at 12:25 AM, Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi aitozi,
>
> 1> CountDistinct
> Currently (flink-1.5), CountDistinct is supported in SQL only under
> window as RongRong described.
> There are ways to implement non-window CountDistinct, for example: a) you
> can write a CountDistinct udaf using MapView or b) Use two groupBy to
> achieve it. The first groupBy distinct records and the second groupBy count
> different records. For the above two non-window approaches, the second one
> achieves a better performance.
>
> As for the OOM problem, I guess you have set the minIdleStateRetentionTime
> and maxIdleStateRetentionTime to a same value which makes the operator
> registers a timer for each record. I opened a issue to track this
> problem[1]. It is better to set different value to these two parameters,
> for example set min to 0.5 day and max to 1 day.
>
> 2> TopN
> Currently, TopN has not been supported in SQL/Table-api. The semantic of
> TopN is different from all the operators available now. For example, TopN
> is an update operator which outputs multi rows for each partition key.
> However, you can write a datastream job to implement TopN.
>
> Thanks, Hequn
>
> [1] https://issues.apache.org/jira/browse/FLINK-9681
>
> On Wed, Jun 27, 2018 at 10:55 PM, Rong Rong <walter...@gmail.com> wrote:
>
>> Hi ,
>>
>> Stream distinct accumulator is actually supported in SQL API [1]. The
>> syntax is pretty much identical to the batch case. A simple example using
>> the tumbling window will be.
>>
>>> SELECT COUNT(DISTINCT col)
>>> FROM t
>>> GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE)
>>
>> I haven't added the support but I think it should be easy to support this
>> in table API as well [2].
>>
>> For TopN I think this could be implemented as a UDF[3], there's also an
>> example in the test utility from the Flink repo [4] that might be a good
>> example.
>>
>> In terms of the aggregation strategy, I believe "window" is not
>> necessarily needed if your data sink can support retraction / upsert, I
>> think @Fabian or @Timo might have more context here.
>>
>> Thanks,
>> Rong
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8688
>> [2] https://issues.apache.org/jira/browse/FLINK-8691
>> [3] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/udfs.html#aggregation-functions
>> [4] https://github.com/apache/flink/blob/master/flink-librar
>> ies/flink-table/src/test/scala/org/apache/flink/table/
>> utils/UserDefinedAggFunctions.scala
>>
>> On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <summerle...@163.com> wrote:
>>
>>> Hi aitozi,
>>>
>>> I think it can be implemented by window or non-window, but it can not be
>>> implemented without keyBy(). A general approach to implement this is as
>>> follow.
>>>
>>> {code}
>>> process(Record records) {
>>>     for (Record record : records) (
>>>         if (!isFilter(record)) {
>>>             agg(record);
>>>         }
>>>     }
>>> }
>>> {code}
>>>
>>> Where the isFilter() is to filter the duplicated records, and the agg()
>>> is the function to do aggregation, in your case that means the count().
>>>
>>> In general, the isFilter() can be implemented base on the
>>> MapState<String, Integer> to store the previous records, so the isFilter()
>>> may look like.
>>>
>>> {code}
>>> boolean isFilter(Record record) {
>>>     Integer oldVal = mapState.get(record);
>>>     if (oldVal == null) {
>>>         mapState.put(record, 1L);
>>>         return false;
>>>     } else {
>>>         mapState.put(record, oldVal + 1L);
>>>         return true;
>>>     }
>>> }
>>> {code}
>>>
>>> as you can see, we need to query the state frequently, one way with
>>> better performance is to the use BloomFilter to implement the isFilter()
>>> but with an approximate result(the accuracy is configurable),
>>> unfortunately it's not easy to use the bloom filter in flink, there are
>>> some works need to do to introduce it (https://issues.apache.org/jir
>>> a/browse/FLINK-8601).
>>>
>>> Best, Sihua
>>> On 06/27/2018 17:12,aitozi<gjying1...@gmail.com> <gjying1...@gmail.com>
>>> wrote:
>>>
>>> Hi, community
>>>
>>> I am using flink to deal with some situation.
>>>
>>> 1. "distinct count" to calculate the uv/pv.
>>> 2.  calculate the topN of the past 1 hour or 1 day time.
>>>
>>> Are these all realized by window? Or is there a best practice on doing
>>> this?
>>>
>>> 3. And when deal with the distinct, if there is no need to do the keyBy
>>> previous, how does the window deal with this.
>>>
>>> Thanks
>>> Aitozi.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>>> nabble.com/
>>>
>>>
>

Reply via email to