I am not sure what is happening. That's why it would be good to have a toy example to reproduce the issue.
What do you mean by "Kafka node version 0.5"? -Matthias On 12/2/16 11:30 AM, Sachin Mittal wrote: > I can provide with the data but data does not seem to be the issue. > If I submit the same data and use same timestamp extractor using the java > client with kafka version 0.10.0.1 aggregation works fine. > I find the issue only when submitting the data with kafka node version 0.5. > It looks like the stream does not extract the time correctly in that case. > > Thanks > Sachin > > On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> wrote: > >> Can you provide example input data (including timetamps) and result. >> What is the expected result (ie, what aggregation do you apply)? >> >> >> -Matthias >> >> On 12/2/16 7:43 AM, Sachin Mittal wrote: >>> Hi, >>> After much debugging I found an issue with timestamp extractor. >>> >>> If I use a custom timestamp extractor with following code: >>> public static class MessageTimestampExtractor implements >>> TimestampExtractor { >>> public long extract(ConsumerRecord<Object, Object> record) { >>> if (record.value() instanceof Message) { >>> return ((Message) record.value()).ts; >>> } else { >>> return record.timestamp(); >>> } >>> } >>> } >>> >>> Here message has a long field ts which stores the timestamp, the >>> aggregation does not work. >>> Note I have checked and ts has valid timestamp values. >>> >>> However if I replace it with say WallclockTimestampExtractor aggregation >> is >>> working fine. >>> >>> I do not understand what could be the issue here. >>> >>> Also note I am using kafka streams version 0.10.0.1 and I am publishing >>> messages via >>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x >>> >>> Let me know if there is some bug in time stamp extractions. >>> >>> Thanks >>> Sachin >>> >>> >>> >>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >>> >>>> Sachin, >>>> >>>> This is indeed a bit wired, and we'd like to try to re-produce your >> issue >>>> locally. Do you have a sample input data for us to try out? >>>> >>>> Guozhang >>>> >>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> I fixed that sorted set issue but I am facing a weird problem which I >> am >>>>> not able to replicate. >>>>> >>>>> Here is the sample problem that I could isolate: >>>>> My class is like this: >>>>> public static class Message implements Comparable<Message> { >>>>> public long ts; >>>>> public String message; >>>>> public String key; >>>>> public Message() {}; >>>>> public Message(long ts, String message, String key) { >>>>> this.ts = ts; >>>>> this.key = key; >>>>> this.message = message; >>>>> } >>>>> public int compareTo(Message paramT) { >>>>> long ts1 = paramT.ts; >>>>> return ts > ts1 ? 1 : -1; >>>>> } >>>>> } >>>>> >>>>> pipeline is like this: >>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\ >>>>> .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>() >> { >>>>> public KeyValue<String, Message> apply(String key, Message value) >> { >>>>> return new KeyValue<String, Message>(value.key, value); >>>>> } >>>>> }) >>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream") >>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>>>> public SortedSet<Message> apply() { >>>>> return new TreeSet<Message>(); >>>>> } >>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>> SortedSet<Message> aggregate) { >>>>> aggregate.add(value); >>>>> return aggregate; >>>>> } >>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), >>>>> Serdes.String(), messagesSerde) >>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { >>>>> public void apply(Windowed<String> key, SortedSet<Message> >> messages) >>>> { >>>>> ... >>>>> } >>>>> }); >>>>> >>>>> So basically I rekey the original message into another topic and then >>>>> aggregate it based on that key. >>>>> What I have observed is that when I used windowed aggregation the >>>>> aggregator does not use previous aggregated value. >>>>> >>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>> SortedSet<Message> aggregate) { >>>>> aggregate.add(value); >>>>> return aggregate; >>>>> } >>>>> >>>>> So in the above function the aggregate is an empty set of every value >>>>> entering into pipeline. When I remove the windowed aggregation, the >>>>> aggregate set retains previously aggregated values in the set. >>>>> >>>>> I am just not able to wrap my head around it. When I ran this type of >>>> test >>>>> locally on windows it is working fine. However a similar pipeline setup >>>>> when run against production on linux is behaving strangely and always >>>>> getting an empty aggregate set. >>>>> Any idea what could be the reason, where should I look at the problem. >>>> Does >>>>> length of key string matters here? I will later try to run the same >>>> simple >>>>> setup on linux and see what happens. But this is a very strange >> behavior. >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> >>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello Sachin, >>>>>> >>>>>> In the implementation of SortedSet, if the object's implemented the >>>>>> Comparable interface, that compareTo function is applied in " >>>>>> aggregate.add(value);", and hence if it returns 0, this element will >>>> not >>>>> be >>>>>> added since it is a Set. >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> What I find is that when I use sorted set as aggregation it fails to >>>>>>> aggregate the values which have compareTo returning 0. >>>>>>> >>>>>>> My class is like this: >>>>>>> public class Message implements Comparable<Message> { >>>>>>> public long ts; >>>>>>> public String message; >>>>>>> public Message() {}; >>>>>>> public Message(long ts, String message) { >>>>>>> this.ts = ts; >>>>>>> this.message = message; >>>>>>> } >>>>>>> public int compareTo(Message paramT) { >>>>>>> long ts1 = paramT.ts; >>>>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> pipeline is like this: >>>>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream") >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>>>>>> public SortedSet<Message> apply() { >>>>>>> return new TreeSet<Message>(); >>>>>>> } >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>>>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>>>> SortedSet<Message> aggregate) { >>>>>>> aggregate.add(value); >>>>>>> return aggregate; >>>>>>> } >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), >>>>>>> Serdes.String(), messagesSerde) >>>>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { >>>>>>> public void apply(Windowed<String> key, SortedSet<Message> >>>>> messages) >>>>>> { >>>>>>> ... >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> So any message published between 10 and 20 seconds gets aggregated in >>>>> 10 >>>>>> - >>>>>>> 20 bucket and I print the size of the set. >>>>>>> However output I get is following: >>>>>>> >>>>>>> Published: 14 >>>>>>> Aggregated: 10 20 -> 1 >>>>>>> >>>>>>> Published: 18 >>>>>>> Aggregated: 10 20 -> 2 >>>>>>> >>>>>>> Published: 11 >>>>>>> Aggregated: 10 20 -> 3 >>>>>>> >>>>>>> Published: 17 >>>>>>> Aggregated: 10 20 -> 4 >>>>>>> >>>>>>> Published: 14 >>>>>>> Aggregated: 10 20 -> 4 >>>>>>> >>>>>>> Published: 15 >>>>>>> Aggregated: 10 20 -> 5 >>>>>>> >>>>>>> Published: 12 >>>>>>> Aggregated: key2 10 20 -> 6 >>>>>>> >>>>>>> Published: 12 >>>>>>> Aggregated: 10 20 -> 6 >>>>>>> >>>>>>> So if you see any message that occurs again for same second, where >>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline. >>>>>>> Notice ones published at 14 and 12 seconds. >>>>>>> >>>>>>> Now I am not sure if problem is with Java ie I should use Comparator >>>>>>> interface and not Comparable for my Message object. Or the problem is >>>>>> with >>>>>>> Kafka stream or with serializing and de-serializing the set of >>>>> messages. >>>>>> If >>>>>>> I replace Set with List all is working fine. >>>>>>> >>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see what >>>>> is >>>>>>> the best java practice here. >>>>>>> >>>>>>> Thanks >>>>>>> Sachin >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com >>>>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> I am using kafka_2.10-0.10.0.1. >>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes. >>>>>>>>> If the stream app using timestamp extractor puts the message in >>>> one >>>>>> or >>>>>>>> more >>>>>>>>> bucket(s), it will get aggregated in those buckets. >>>>>>>>> I assume this statement is correct. >>>>>>>>> >>>>>>>> >>>>>>>> Yes. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> Also say when I restart the streams application then bucket >>>>>> aggregation >>>>>>>>> will resume from last point of halt. >>>>>>>>> I hope this is also correct. >>>>>>>>> >>>>>>>> >>>>>>>> Yes. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> What I noticed that once a message is placed in one bucket, that >>>>>> bucket >>>>>>>> was >>>>>>>>> not getting new messages. >>>>>>>>> >>>>>>>> >>>>>>>> This should not happen... >>>>>>>> >>>>>>>> >>>>>>>>> However when I ran a small test case replicating that, it is >>>>> working >>>>>>>>> properly. There maybe some issues in application reset. >>>>>>>>> >>>>>>>> >>>>>>>> ...and apparently it works (as expected) in your small test case. >>>>>>>> >>>>>>>> Do you have any further information that you could share with us so >>>>> we >>>>>>> can >>>>>>>> help you better? What's the difference, for example, between your >>>>>>> "normal" >>>>>>>> use case and the small test case you have been referring to? >>>>>>>> >>>>>>>> >>>>>>>> -Michael >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature