For the above two non-window approaches, the second one achieves a better performance. => For the above two non-window approaches, the second one achieves a better performance in most cases especially when there are many same rows.
On Thu, Jun 28, 2018 at 12:25 AM, Hequn Cheng <chenghe...@gmail.com> wrote: > Hi aitozi, > > 1> CountDistinct > Currently (flink-1.5), CountDistinct is supported in SQL only under > window as RongRong described. > There are ways to implement non-window CountDistinct, for example: a) you > can write a CountDistinct udaf using MapView or b) Use two groupBy to > achieve it. The first groupBy distinct records and the second groupBy count > different records. For the above two non-window approaches, the second one > achieves a better performance. > > As for the OOM problem, I guess you have set the minIdleStateRetentionTime > and maxIdleStateRetentionTime to a same value which makes the operator > registers a timer for each record. I opened a issue to track this > problem[1]. It is better to set different value to these two parameters, > for example set min to 0.5 day and max to 1 day. > > 2> TopN > Currently, TopN has not been supported in SQL/Table-api. The semantic of > TopN is different from all the operators available now. For example, TopN > is an update operator which outputs multi rows for each partition key. > However, you can write a datastream job to implement TopN. > > Thanks, Hequn > > [1] https://issues.apache.org/jira/browse/FLINK-9681 > > On Wed, Jun 27, 2018 at 10:55 PM, Rong Rong <walter...@gmail.com> wrote: > >> Hi , >> >> Stream distinct accumulator is actually supported in SQL API [1]. The >> syntax is pretty much identical to the batch case. A simple example using >> the tumbling window will be. >> >>> SELECT COUNT(DISTINCT col) >>> FROM t >>> GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE) >> >> I haven't added the support but I think it should be easy to support this >> in table API as well [2]. >> >> For TopN I think this could be implemented as a UDF[3], there's also an >> example in the test utility from the Flink repo [4] that might be a good >> example. >> >> In terms of the aggregation strategy, I believe "window" is not >> necessarily needed if your data sink can support retraction / upsert, I >> think @Fabian or @Timo might have more context here. >> >> Thanks, >> Rong >> >> [1] https://issues.apache.org/jira/browse/FLINK-8688 >> [2] https://issues.apache.org/jira/browse/FLINK-8691 >> [3] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/table/udfs.html#aggregation-functions >> [4] https://github.com/apache/flink/blob/master/flink-librar >> ies/flink-table/src/test/scala/org/apache/flink/table/ >> utils/UserDefinedAggFunctions.scala >> >> On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <summerle...@163.com> wrote: >> >>> Hi aitozi, >>> >>> I think it can be implemented by window or non-window, but it can not be >>> implemented without keyBy(). A general approach to implement this is as >>> follow. >>> >>> {code} >>> process(Record records) { >>> for (Record record : records) ( >>> if (!isFilter(record)) { >>> agg(record); >>> } >>> } >>> } >>> {code} >>> >>> Where the isFilter() is to filter the duplicated records, and the agg() >>> is the function to do aggregation, in your case that means the count(). >>> >>> In general, the isFilter() can be implemented base on the >>> MapState<String, Integer> to store the previous records, so the isFilter() >>> may look like. >>> >>> {code} >>> boolean isFilter(Record record) { >>> Integer oldVal = mapState.get(record); >>> if (oldVal == null) { >>> mapState.put(record, 1L); >>> return false; >>> } else { >>> mapState.put(record, oldVal + 1L); >>> return true; >>> } >>> } >>> {code} >>> >>> as you can see, we need to query the state frequently, one way with >>> better performance is to the use BloomFilter to implement the isFilter() >>> but with an approximate result(the accuracy is configurable), >>> unfortunately it's not easy to use the bloom filter in flink, there are >>> some works need to do to introduce it (https://issues.apache.org/jir >>> a/browse/FLINK-8601). >>> >>> Best, Sihua >>> On 06/27/2018 17:12,aitozi<gjying1...@gmail.com> <gjying1...@gmail.com> >>> wrote: >>> >>> Hi, community >>> >>> I am using flink to deal with some situation. >>> >>> 1. "distinct count" to calculate the uv/pv. >>> 2. calculate the topN of the past 1 hour or 1 day time. >>> >>> Are these all realized by window? Or is there a best practice on doing >>> this? >>> >>> 3. And when deal with the distinct, if there is no need to do the keyBy >>> previous, how does the window deal with this. >>> >>> Thanks >>> Aitozi. >>> >>> >>> >>> -- >>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >>> nabble.com/ >>> >>> >