Hi, from this I would expect to get as many HashMaps as you have keys. The winFunction is also executed per-key so it cannot combine the HashMaps of all keys.
Does this describe the behavior that you're seeing? Cheers, Aljoscha On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra <mari...@event-fabric.com> wrote: > hi! > > I'm trying to collect some metrics by key per window and emiting the full > result at the end of the window to kafka, I started with a simple count by > key to test it but my requirements are a little more complex than that. > > what I want to do is to fold the stream events as they come and then at > the end of the window merge them together and emit a singe result, I don't > want to accumulate all the events and calculate at the end of the window, > from my understanding of fold in other languages/libraries, this would be > what I need, using it via apply(stateIn, foldFun, windowFun) but it's not > working: > > the basic is: > > input > .flatMap(new LineSplitter()) > .keyBy(0) > .timeWindow(Time.of(5, TimeUnit.SECONDS)) > .apply(new HashMap<String, Integer>(), foldFunction, > winFunction); > > where foldFunction accumulates by key and winFunction iterate over the > hasmaps and merges them into a single result hashmap and emits that one at > the end. > > this emits many one-key hash maps instead of only one with all the keys, I > tried setting setParallelism(1) in multiple places but still doesn't work. > More confusingly, in one run it emited a single map but after I ran it > again it went back to the previous behavior. > > what I'm doing wrong? is there any other approach? > > I can provide the implementation of foldFunction and winFunction if > required but I think it doesn't change much. > > Reading the source code I see: > > Applies the given window function to each window. The window function > is called for each evaluation of the window for each key individually. The > output of the window function is interpreted as a regular non-windowed > stream. > > emphasis on " for each key individually", the return type of apply is > SingleOutputStreamOperator which doesn't provide many operations to group > the emited values. > > thanks in advance. >