Hi, what's the number of unique keys and the parallelism of your job? If the former is larger than the latter you should indeed have one "timeWindowFold" be responsible for several keys. How are you determining whether one of these is only accumulating for a single key?
Cheers, Aljoscha On Mon, 5 Sep 2016 at 17:35 Luis Mariano Guerra <mari...@event-fabric.com> wrote: > On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> > for this you would have to use a non-parallel window, i.e. something like >> stream.windowAll(<my window>).apply(...). This does not compute per key but >> has the drawback that computation does not happen in parallel. If you only >> use it to combine the pre-aggregated maps it could be OK, though. >> >> Cheers, >> Aljoscha >> > > hi, > > thanks for the tip, it works, I was aware of the non parallel nature of > what I want to do, after seeing it work I tried this: > > input.flatMap(new LineSplitter()).keyBy(0) > .timeWindow(Time.of(5, TimeUnit.SECONDS)) > .apply(new HashMap<String, Integer>(), timeWindowFold, > timeWindowMerge) > .windowAll(TumblingEventTimeWindows.of(Time.of(5, > TimeUnit.SECONDS))) > .apply(new HashMap<String, Integer>(), windowAllFold, > windowAllMerge); > > and it seems to work, but it seems each timeWindowFold accumulates a > single key, I was expecting to have one or more keys per fold depending on > in which processing node the computation was being handled, I don't care if > I emit one event per key, but I want to know if it's ok and if I'm missing > something (maybe I have to add a line to specify partition?) > > >> On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <mari...@event-fabric.com> >> wrote: >> >>> On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> from this I would expect to get as many HashMaps as you have keys. The >>>> winFunction is also executed per-key so it cannot combine the HashMaps of >>>> all keys. >>>> >>>> Does this describe the behavior that you're seeing? >>>> >>> >>> yes, it's the behaviour I'm seeing, I'm looking for a way to merge those >>> HashMaps from the same window into a single one, I can't find how. >>> >>> >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra < >>>> mari...@event-fabric.com> wrote: >>>> >>>>> hi! >>>>> >>>>> I'm trying to collect some metrics by key per window and emiting the >>>>> full result at the end of the window to kafka, I started with a simple >>>>> count by key to test it but my requirements are a little more complex than >>>>> that. >>>>> >>>>> what I want to do is to fold the stream events as they come and then >>>>> at the end of the window merge them together and emit a singe result, I >>>>> don't want to accumulate all the events and calculate at the end of the >>>>> window, from my understanding of fold in other languages/libraries, this >>>>> would be what I need, using it via apply(stateIn, foldFun, windowFun) but >>>>> it's not working: >>>>> >>>>> the basic is: >>>>> >>>>> input >>>>> .flatMap(new LineSplitter()) >>>>> .keyBy(0) >>>>> .timeWindow(Time.of(5, TimeUnit.SECONDS)) >>>>> .apply(new HashMap<String, Integer>(), foldFunction, >>>>> winFunction); >>>>> >>>>> where foldFunction accumulates by key and winFunction iterate over the >>>>> hasmaps and merges them into a single result hashmap and emits that one at >>>>> the end. >>>>> >>>>> this emits many one-key hash maps instead of only one with all the >>>>> keys, I tried setting setParallelism(1) in multiple places but still >>>>> doesn't work. More confusingly, in one run it emited a single map but >>>>> after >>>>> I ran it again it went back to the previous behavior. >>>>> >>>>> what I'm doing wrong? is there any other approach? >>>>> >>>>> I can provide the implementation of foldFunction and winFunction if >>>>> required but I think it doesn't change much. >>>>> >>>>> Reading the source code I see: >>>>> >>>>> Applies the given window function to each window. The window >>>>> function is called for each evaluation of the window for each key >>>>> individually. The output of the window function is interpreted as a >>>>> regular >>>>> non-windowed stream. >>>>> >>>>> emphasis on " for each key individually", the return type of apply is >>>>> SingleOutputStreamOperator which doesn't provide many operations to group >>>>> the emited values. >>>>> >>>>> thanks in advance. >>>>> >>>>