Solved. Probably there was an error in the way I was testing. Also I simplified the job and it works now.
2016-09-27 16:01 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>: > Hello, > > I'm dealing with an analytical job in streaming and I don't know how to > write the last part. > > Actually I want to count all the elements in a window with a given status, > so I keep a state with a Map[Status,Long]. This state is updated starting > from tuples containing the oldStatus and the newStatus. So every event > generates a +1 for the new status and a -1 for the old status. Then I want > to reduce all these counts and move from a local and partial state to a > global state that will be written in output. > > Right now my code look like: > > filteredLatestOrders.keyBy(x => x._1.getStatus).mapWithState( > updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB)) > > where "filteredLatestOrder" is a DataStream containing informations about > the elements, the new state and the old state. > > This produces in output: > > 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0) > 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0) > 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1) > 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0) > 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1) > > I thought that keying with a fixed value would collect all the elements in > a single node so that I could finally compute the final result, but they > are left on different nodes and are never summed. > > Is this the correct approach? In that case, how can I do what I need? Is > there a smarter way to count distinct evolving elements by their status in > a streaming? Mind that the original source of events are updates to the > status of an element and the requirement is that I want to count only the > latest status available. > > Thank you in advance, > > Simone >