Hi Li, what you can do is to add the #queries when splitting: user1 - [query1, query2, query3] -> [user1, query1, 3] [user1, query2, 3] [user1, query2, 3] -> Then while collecting the results, you just compare the current number of records in the window and emit if it reaches the expected number.
Make sure that while you split, you also generate a unique key to group the results. So add the same UUID (or so) to [user1, query1, 3] etc., so that you can easily group the results. You probably also need to implement a custom trigger and evictor. Let me know if you have any more questions. On Mon, Jan 11, 2021 at 11:41 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi Li, > > From my view I think it would not be eaily use a countWindow if you > have different number of records for each key (namely user in this case). I > think you may need to user the low level KeyedProcessFunction [1] to keep > some state by yourself. For example, each request might also carries the > total number of requests of each user, and in the KeyedProcessFunction you > might record the received number of requests and total requests of this > user in the state. Whenever enough requests is received for each user, it > could be known that the message is completely processed and the state of > this user could also be cleaned at then. > > > Best, > Yun > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#the-keyedprocessfunction > > ------------------------------------------------------------------ > Sender:Li Wang<liwang0...@gmail.com> > Date:2021/01/11 07:10:27 > Recipient:<user@flink.apache.org> > Theme:Re: Use Flink to process request with list of queries and aggregate > > Can I get any suggestion? Thanks a lot. > > - Li > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng