Hi Hequn, indeed the ReduceFunction is better than the ProcessWindowFunction. I replaced and could check the improvement performance [1]. Thanks for that! I will try a distinct count with the Table API.
The question that I am facing is that I want to use a HyperLogLog on a UDF for DataStream. Thus I will be able to have an approximate distinct count inside a window, like I did here [2]. After having my UDF I want to have my own operator which process this approximation of distinct count. So I am not sure with I can implement my own operator for the TableAPI. Can I? [1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountReduceWindowSocket.java [2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java Thanks! Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Thu, Jun 13, 2019 at 8:10 AM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Felipe, > > From your code, I think you want to get the "count distinct" result > instead of the "distinct count". They contain a different meaning. > > To improve the performance, you can replace > your DistinctProcessWindowFunction to a DistinctProcessReduceFunction. A > ReduceFunction can aggregate the elements of a window incrementally, while > for ProcessWindowFunction, elements cannot be incrementally aggregated but > instead need to be buffered internally until the window is considered ready > for processing. > > > Flink does not have a built-in operator which does this computation. > Flink does have built-in operators to solve your problem. You can use > Table API & SQL to solve your problem. The code looks like: > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > > DataStream ds = env.socketTextStream("localhost", 9000); > tableEnv.registerDataStream("sourceTable", ds, "line, proctime.proctime"); > > SplitTableFunction splitFunc = new SplitTableFunction(); > tableEnv.registerFunction("splitFunc", splitFunc); > Table result = tableEnv.scan("sourceTable") > .joinLateral("splitFunc(line) as word") > .window(Tumble.over("5.seconds").on("proctime").as("w")) > .groupBy("w") > .select("count.distinct(word), collect.distinct(word)"); > > tableEnv.toAppendStream(result, Row.class).print(); > env.execute(); > } > > Detail code can be found here[1]. > > At the same time, you can perform two-stage window to improve the > performance, i.e., the first window aggregate is used to distinct words and > the second window used to get the final results. > > Document about Table API and SQL can be found here[2][3]. > > Best, Hequn > > [1] > https://github.com/hequn8128/flink/commit/b4676a5730cecabe2931b9e628aaebd7729beab2 > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html > > > On Wed, Jun 12, 2019 at 9:19 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Hi Rong, I implemented my solution using a ProcessingWindow >> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the >> first window I use parallelism and the second window I use to merge >> everything on the Reducer. I guess it is the best approach to do >> DistinctCount. Would you suggest some improvements? >> >> [1] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java >> >> Thanks! >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >>> Hi Rong, >>> >>> thanks for your answer. If I understood well, the option will be to use >>> ProcessFunction [1] since it has the method onTimer(). But not the >>> ProcessWindowFunction [2], because it does not have the method onTimer(). I >>> will need this method to call Collector<OUT> out.collect(...) from the >>> onTImer() method in order to emit a single value of my Distinct Count >>> function. >>> >>> Is that reasonable what I am saying? >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html >>> >>> Kind Regards, >>> Felipe >>> >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong <walter...@gmail.com> wrote: >>> >>>> Hi Felipe, >>>> >>>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact >>>> there's already a thread going on recently [1] >>>> Based on the description you provided, it seems like it might be a >>>> better API level to use. >>>> >>>> To answer your question, >>>> - You should be able to use other TimeCharacteristic. You might want to >>>> try WindowProcessFunction and see if this fits your use case. >>>> - Not sure I fully understand the question, your keyed by should be >>>> done on your distinct key (or a combo key) and if you do keyby correctly >>>> then yes all msg with same key is processed by the same TM thread. >>>> >>>> -- >>>> Rong >>>> >>>> >>>> >>>> [1] >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html >>>> >>>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez < >>>> felipe.o.gutier...@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I have implemented a Flink data stream application to compute distinct >>>>> count of words. Flink does not have a built-in operator which does this >>>>> computation. I used KeyedProcessFunction and I am saving the state on a >>>>> ValueState descriptor. >>>>> Could someone check if my implementation is the best way of doing it? >>>>> Here is my solution: >>>>> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296 >>>>> >>>>> I have some points that I could not understand better: >>>>> - I only could use TimeCharacteristic.IngestionTime. >>>>> - I split the words using "Tuple2<Integer, String>(0, word)", so I >>>>> will have always the same key (0). As I understand, all the events will be >>>>> processed on the same TaskManager which will not achieve parallelism if I >>>>> am in a cluster. >>>>> >>>>> Kind Regards, >>>>> Felipe >>>>> *--* >>>>> *-- Felipe Gutierrez* >>>>> >>>>> *-- skype: felipe.o.gutierrez* >>>>> *--* *https://felipeogutierrez.blogspot.com >>>>> <https://felipeogutierrez.blogspot.com>* >>>>> >>>>