Hi aitozi,

1> CountDistinct
Currently (flink-1.5), CountDistinct is supported in SQL only under window
as RongRong described.
There are ways to implement non-window CountDistinct, for example: a) you
can write a CountDistinct udaf using MapView or b) Use two groupBy to
achieve it. The first groupBy distinct records and the second groupBy count
different records. For the above two non-window approaches, the second one
achieves a better performance.

As for the OOM problem, I guess you have set the minIdleStateRetentionTime
and maxIdleStateRetentionTime to a same value which makes the operator
registers a timer for each record. I opened a issue to track this
problem[1]. It is better to set different value to these two parameters,
for example set min to 0.5 day and max to 1 day.

2> TopN
Currently, TopN has not been supported in SQL/Table-api. The semantic of
TopN is different from all the operators available now. For example, TopN
is an update operator which outputs multi rows for each partition key.
However, you can write a datastream job to implement TopN.

Thanks, Hequn

[1] https://issues.apache.org/jira/browse/FLINK-9681

On Wed, Jun 27, 2018 at 10:55 PM, Rong Rong <walter...@gmail.com> wrote:

> Hi ,
>
> Stream distinct accumulator is actually supported in SQL API [1]. The
> syntax is pretty much identical to the batch case. A simple example using
> the tumbling window will be.
>
>> SELECT COUNT(DISTINCT col)
>> FROM t
>> GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE)
>
> I haven't added the support but I think it should be easy to support this
> in table API as well [2].
>
> For TopN I think this could be implemented as a UDF[3], there's also an
> example in the test utility from the Flink repo [4] that might be a good
> example.
>
> In terms of the aggregation strategy, I believe "window" is not
> necessarily needed if your data sink can support retraction / upsert, I
> think @Fabian or @Timo might have more context here.
>
> Thanks,
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-8688
> [2] https://issues.apache.org/jira/browse/FLINK-8691
> [3] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/udfs.html#aggregation-functions
> [4] https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/test/scala/org/apache/flink/table/utils/
> UserDefinedAggFunctions.scala
>
> On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <summerle...@163.com> wrote:
>
>> Hi aitozi,
>>
>> I think it can be implemented by window or non-window, but it can not be
>> implemented without keyBy(). A general approach to implement this is as
>> follow.
>>
>> {code}
>> process(Record records) {
>>     for (Record record : records) (
>>         if (!isFilter(record)) {
>>             agg(record);
>>         }
>>     }
>> }
>> {code}
>>
>> Where the isFilter() is to filter the duplicated records, and the agg()
>> is the function to do aggregation, in your case that means the count().
>>
>> In general, the isFilter() can be implemented base on the
>> MapState<String, Integer> to store the previous records, so the isFilter()
>> may look like.
>>
>> {code}
>> boolean isFilter(Record record) {
>>     Integer oldVal = mapState.get(record);
>>     if (oldVal == null) {
>>         mapState.put(record, 1L);
>>         return false;
>>     } else {
>>         mapState.put(record, oldVal + 1L);
>>         return true;
>>     }
>> }
>> {code}
>>
>> as you can see, we need to query the state frequently, one way with
>> better performance is to the use BloomFilter to implement the isFilter()
>> but with an approximate result(the accuracy is configurable),
>> unfortunately it's not easy to use the bloom filter in flink, there are
>> some works need to do to introduce it (https://issues.apache.org/
>> jira/browse/FLINK-8601).
>>
>> Best, Sihua
>> On 06/27/2018 17:12,aitozi<gjying1...@gmail.com> <gjying1...@gmail.com>
>> wrote:
>>
>> Hi, community
>>
>> I am using flink to deal with some situation.
>>
>> 1. "distinct count" to calculate the uv/pv.
>> 2.  calculate the topN of the past 1 hour or 1 day time.
>>
>> Are these all realized by window? Or is there a best practice on doing
>> this?
>>
>> 3. And when deal with the distinct, if there is no need to do the keyBy
>> previous, how does the window deal with this.
>>
>> Thanks
>> Aitozi.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>>

Reply via email to