Hi, for this you would have to use a non-parallel window, i.e. something like stream.windowAll(<my window>).apply(...). This does not compute per key but has the drawback that computation does not happen in parallel. If you only use it to combine the pre-aggregated maps it could be OK, though.
Cheers, Aljoscha On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <mari...@event-fabric.com> wrote: > On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> from this I would expect to get as many HashMaps as you have keys. The >> winFunction is also executed per-key so it cannot combine the HashMaps of >> all keys. >> >> Does this describe the behavior that you're seeing? >> > > yes, it's the behaviour I'm seeing, I'm looking for a way to merge those > HashMaps from the same window into a single one, I can't find how. > > >> >> Cheers, >> Aljoscha >> >> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <mari...@event-fabric.com> >> wrote: >> >>> hi! >>> >>> I'm trying to collect some metrics by key per window and emiting the >>> full result at the end of the window to kafka, I started with a simple >>> count by key to test it but my requirements are a little more complex than >>> that. >>> >>> what I want to do is to fold the stream events as they come and then at >>> the end of the window merge them together and emit a singe result, I don't >>> want to accumulate all the events and calculate at the end of the window, >>> from my understanding of fold in other languages/libraries, this would be >>> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not >>> working: >>> >>> the basic is: >>> >>> input >>> .flatMap(new LineSplitter()) >>> .keyBy(0) >>> .timeWindow(Time.of(5, TimeUnit.SECONDS)) >>> .apply(new HashMap<String, Integer>(), foldFunction, >>> winFunction); >>> >>> where foldFunction accumulates by key and winFunction iterate over the >>> hasmaps and merges them into a single result hashmap and emits that one at >>> the end. >>> >>> this emits many one-key hash maps instead of only one with all the keys, >>> I tried setting setParallelism(1) in multiple places but still doesn't >>> work. More confusingly, in one run it emited a single map but after I ran >>> it again it went back to the previous behavior. >>> >>> what I'm doing wrong? is there any other approach? >>> >>> I can provide the implementation of foldFunction and winFunction if >>> required but I think it doesn't change much. >>> >>> Reading the source code I see: >>> >>> Applies the given window function to each window. The window >>> function is called for each evaluation of the window for each key >>> individually. The output of the window function is interpreted as a regular >>> non-windowed stream. >>> >>> emphasis on " for each key individually", the return type of apply is >>> SingleOutputStreamOperator which doesn't provide many operations to group >>> the emited values. >>> >>> thanks in advance. >>> >>