I am using kafka_2.10-0.10.0.1. Say I am having a window of 60 minutes advanced by 15 minutes. If the stream app using timestamp extractor puts the message in one or more bucket(s), it will get aggregated in those buckets. I assume this statement is correct.
Also say when I restart the streams application then bucket aggregation will resume from last point of halt. I hope this is also correct. What I noticed that once a message is placed in one bucket, that bucket was not getting new messages. However when I ran a small test case replicating that, it is working properly. There maybe some issues in application reset. Thanks Sachin On Fri, Nov 18, 2016 at 11:30 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Sachin, > > Which version of Kafka are you using for this application? > > > Guozhang > > > On Tue, Nov 15, 2016 at 9:52 AM, Sachin Mittal <sjmit...@gmail.com> wrote: > > > Hi, > > I have a simple pipeline > > stream.aggregateByKey(new Initializer<List>() { > > public List apply() { > > return new List > > } > > }, new Aggregator<Key, Value, List>() { > > public List apply(key, value, list) { > > list.add(value) > > return list > > } > > }, keysSerde, valuesSerde, "table") > > > > So this basically aggregates list of values by some key of a source > stream. > > This is working fine. > > > > However over time the list will grow very big, so I thought of using > > windowed table. > > > > stream.aggregateByKey(new Initializer<List>() { > > public List apply() { > > return new List > > } > > }, new Aggregator<Key, Value, List>() { > > public List apply(key, value, list) { > > list.add(value) > > return list > > } > > }, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde, > > valuesSerde) > > > > It is basically the above code, but what I find is that it aggregates > only > > one value for a given windowed key. > > So size of list is always one. > > > > What I understood is that it will put the source values in a time bucket > > based on their timestamp extractor. When i check the timed window I see > > that value's timestamp between the bounds of time window. > > > > However I have not understood that why it is aggregating only a single > > value always. > > > > So to downstream I always get something like > > > > (key, start, end) -> [value1] > > (key, start, end) -> [value2] > > and not > > (key, start, end) -> [value1, value2] > > note both value1 and value2 are between the start and end bonds. > > > > However in first case I get this > > key -> [value1, value2] which is what I expect. > > > > So please let me know if I am missing something in my windowed > aggregation. > > > > Or if there is something else to be done to get the output I want. > > > > Thanks > > Sachin > > > > > > -- > -- Guozhang >