Hi, After much debugging I found an issue with timestamp extractor. If I use a custom timestamp extractor with following code: public static class MessageTimestampExtractor implements TimestampExtractor { public long extract(ConsumerRecord<Object, Object> record) { if (record.value() instanceof Message) { return ((Message) record.value()).ts; } else { return record.timestamp(); } } }
Here message has a long field ts which stores the timestamp, the aggregation does not work. Note I have checked and ts has valid timestamp values. However if I replace it with say WallclockTimestampExtractor aggregation is working fine. I do not understand what could be the issue here. Also note I am using kafka streams version 0.10.0.1 and I am publishing messages via https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x Let me know if there is some bug in time stamp extractions. Thanks Sachin On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Sachin, > > This is indeed a bit wired, and we'd like to try to re-produce your issue > locally. Do you have a sample input data for us to try out? > > Guozhang > > On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com> > wrote: > > > Hi, > > I fixed that sorted set issue but I am facing a weird problem which I am > > not able to replicate. > > > > Here is the sample problem that I could isolate: > > My class is like this: > > public static class Message implements Comparable<Message> { > > public long ts; > > public String message; > > public String key; > > public Message() {}; > > public Message(long ts, String message, String key) { > > this.ts = ts; > > this.key = key; > > this.message = message; > > } > > public int compareTo(Message paramT) { > > long ts1 = paramT.ts; > > return ts > ts1 ? 1 : -1; > > } > > } > > > > pipeline is like this: > > builder.stream(Serdes.String(), messageSerde, "test-window-stream")\ > > .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>() { > > public KeyValue<String, Message> apply(String key, Message value) { > > return new KeyValue<String, Message>(value.key, value); > > } > > }) > > .through(Serdes.String(), messageSerde, "test-window-key-stream") > > .aggregateByKey(new Initializer<SortedSet<Message>>() { > > public SortedSet<Message> apply() { > > return new TreeSet<Message>(); > > } > > }, new Aggregator<String, Message, SortedSet<Message>>() { > > public SortedSet<Message> apply(String aggKey, Message value, > > SortedSet<Message> aggregate) { > > aggregate.add(value); > > return aggregate; > > } > > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > > Serdes.String(), messagesSerde) > > .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { > > public void apply(Windowed<String> key, SortedSet<Message> messages) > { > > ... > > } > > }); > > > > So basically I rekey the original message into another topic and then > > aggregate it based on that key. > > What I have observed is that when I used windowed aggregation the > > aggregator does not use previous aggregated value. > > > > public SortedSet<Message> apply(String aggKey, Message value, > > SortedSet<Message> aggregate) { > > aggregate.add(value); > > return aggregate; > > } > > > > So in the above function the aggregate is an empty set of every value > > entering into pipeline. When I remove the windowed aggregation, the > > aggregate set retains previously aggregated values in the set. > > > > I am just not able to wrap my head around it. When I ran this type of > test > > locally on windows it is working fine. However a similar pipeline setup > > when run against production on linux is behaving strangely and always > > getting an empty aggregate set. > > Any idea what could be the reason, where should I look at the problem. > Does > > length of key string matters here? I will later try to run the same > simple > > setup on linux and see what happens. But this is a very strange behavior. > > > > Thanks > > Sachin > > > > > > > > On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > Hello Sachin, > > > > > > In the implementation of SortedSet, if the object's implemented the > > > Comparable interface, that compareTo function is applied in " > > > aggregate.add(value);", and hence if it returns 0, this element will > not > > be > > > added since it is a Set. > > > > > > > > > Guozhang > > > > > > > > > On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > What I find is that when I use sorted set as aggregation it fails to > > > > aggregate the values which have compareTo returning 0. > > > > > > > > My class is like this: > > > > public class Message implements Comparable<Message> { > > > > public long ts; > > > > public String message; > > > > public Message() {}; > > > > public Message(long ts, String message) { > > > > this.ts = ts; > > > > this.message = message; > > > > } > > > > public int compareTo(Message paramT) { > > > > long ts1 = paramT.ts; > > > > return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; > > > > } > > > > } > > > > > > > > pipeline is like this: > > > > builder.stream(Serdes.String(), messageSerde, "test-window-stream") > > > > .aggregateByKey(new Initializer<SortedSet<Message>>() { > > > > public SortedSet<Message> apply() { > > > > return new TreeSet<Message>(); > > > > } > > > > }, new Aggregator<String, Message, SortedSet<Message>>() { > > > > public SortedSet<Message> apply(String aggKey, Message value, > > > > SortedSet<Message> aggregate) { > > > > aggregate.add(value); > > > > return aggregate; > > > > } > > > > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > > > > Serdes.String(), messagesSerde) > > > > .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { > > > > public void apply(Windowed<String> key, SortedSet<Message> > > messages) > > > { > > > > ... > > > > } > > > > }); > > > > > > > > So any message published between 10 and 20 seconds gets aggregated in > > 10 > > > - > > > > 20 bucket and I print the size of the set. > > > > However output I get is following: > > > > > > > > Published: 14 > > > > Aggregated: 10 20 -> 1 > > > > > > > > Published: 18 > > > > Aggregated: 10 20 -> 2 > > > > > > > > Published: 11 > > > > Aggregated: 10 20 -> 3 > > > > > > > > Published: 17 > > > > Aggregated: 10 20 -> 4 > > > > > > > > Published: 14 > > > > Aggregated: 10 20 -> 4 > > > > > > > > Published: 15 > > > > Aggregated: 10 20 -> 5 > > > > > > > > Published: 12 > > > > Aggregated: key2 10 20 -> 6 > > > > > > > > Published: 12 > > > > Aggregated: 10 20 -> 6 > > > > > > > > So if you see any message that occurs again for same second, where > > > > compareTo returns 0, it fails to get aggregated in the pipeline. > > > > Notice ones published at 14 and 12 seconds. > > > > > > > > Now I am not sure if problem is with Java ie I should use Comparator > > > > interface and not Comparable for my Message object. Or the problem is > > > with > > > > Kafka stream or with serializing and de-serializing the set of > > messages. > > > If > > > > I replace Set with List all is working fine. > > > > > > > > Anyway any ideas here would be appreciated, meanwhile let me see what > > is > > > > the best java practice here. > > > > > > > > Thanks > > > > Sachin > > > > > > > > > > > > > > > > On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io> > > > > wrote: > > > > > > > > > On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com > > > > > > wrote: > > > > > > > > > > > I am using kafka_2.10-0.10.0.1. > > > > > > Say I am having a window of 60 minutes advanced by 15 minutes. > > > > > > If the stream app using timestamp extractor puts the message in > one > > > or > > > > > more > > > > > > bucket(s), it will get aggregated in those buckets. > > > > > > I assume this statement is correct. > > > > > > > > > > > > > > > > Yes. > > > > > > > > > > > > > > > > > > > > > > > > > > > Also say when I restart the streams application then bucket > > > aggregation > > > > > > will resume from last point of halt. > > > > > > I hope this is also correct. > > > > > > > > > > > > > > > > Yes. > > > > > > > > > > > > > > > > > > > > > What I noticed that once a message is placed in one bucket, that > > > bucket > > > > > was > > > > > > not getting new messages. > > > > > > > > > > > > > > > > This should not happen... > > > > > > > > > > > > > > > > However when I ran a small test case replicating that, it is > > working > > > > > > properly. There maybe some issues in application reset. > > > > > > > > > > > > > > > > ...and apparently it works (as expected) in your small test case. > > > > > > > > > > Do you have any further information that you could share with us so > > we > > > > can > > > > > help you better? What's the difference, for example, between your > > > > "normal" > > > > > use case and the small test case you have been referring to? > > > > > > > > > > > > > > > -Michael > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >