To unsubscribe you need to sent an email to users-unsubscr...@kafka.apache.org
-Matthias On 12/3/16 6:13 PM, williamtellme123 wrote: > Unsubscribe > > > Sent via the Samsung Galaxy S7, an AT&T 4G LTE smartphone > -------- Original message --------From: Guozhang Wang <wangg...@gmail.com> > Date: 12/2/16 5:48 PM (GMT-06:00) To: users@kafka.apache.org Subject: Re: > Kafka windowed table not aggregating correctly > Sachin, > > One thing to note is that the retention of the windowed stores works by > keeping multiple segments of the stores where each segments stores a time > range which can potentially span multiple windows, if a new window needs to > be created that is further from the oldest segment's time range + retention > period (from your code it seems you do not override it from > TimeWindows.of("stream-table", > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one > day is used. > > So with WallclockTimeExtractor since it is using system time, it wont give > you timestamps that span for more than a day during a short period of time, > but if your own defined timestamps expand that value, then old segments > will be dropped immediately and hence the aggregate values will be returned > as a single value. > > Guozhang > > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> The extractor is used in >> >> org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords() >> >> Let us know, if you could resolve the problem or need more help. >> >> -Matthias >> >> On 12/2/16 11:46 AM, Sachin Mittal wrote: >>> https://github.com/SOHU-Co/kafka-node/ this is the node js client i am >>> using. The version is 0.5x. Can you please tell me what code in streams >>> calls the timestamp extractor. I can look there to see if there is any >>> issue. >>> >>> Again issue happens only when producing the messages using producer that >> is >>> compatible with kafka version 0.8x. I see that this producer does not >> send >>> a record timestamp as this was introduced in version 0.10 only. >>> >>> Thanks >>> Sachin >>> >>> On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io> >> wrote: >>> >>>> I am not sure what is happening. That's why it would be good to have a >>>> toy example to reproduce the issue. >>>> >>>> What do you mean by "Kafka node version 0.5"? >>>> >>>> -Matthias >>>> >>>> On 12/2/16 11:30 AM, Sachin Mittal wrote: >>>>> I can provide with the data but data does not seem to be the issue. >>>>> If I submit the same data and use same timestamp extractor using the >>>> java >>>>> client with kafka version 0.10.0.1 aggregation works fine. >>>>> I find the issue only when submitting the data with kafka node version >>>> 0.5. >>>>> It looks like the stream does not extract the time correctly in that >>>> case. >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> >>>> wrote: >>>>> >>>>>> Can you provide example input data (including timetamps) and result. >>>>>> What is the expected result (ie, what aggregation do you apply)? >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 12/2/16 7:43 AM, Sachin Mittal wrote: >>>>>>> Hi, >>>>>>> After much debugging I found an issue with timestamp extractor. >>>>>>> >>>>>>> If I use a custom timestamp extractor with following code: >>>>>>> public static class MessageTimestampExtractor implements >>>>>>> TimestampExtractor { >>>>>>> public long extract(ConsumerRecord<Object, Object> record) { >>>>>>> if (record.value() instanceof Message) { >>>>>>> return ((Message) record.value()).ts; >>>>>>> } else { >>>>>>> return record.timestamp(); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Here message has a long field ts which stores the timestamp, the >>>>>>> aggregation does not work. >>>>>>> Note I have checked and ts has valid timestamp values. >>>>>>> >>>>>>> However if I replace it with say WallclockTimestampExtractor >>>> aggregation >>>>>> is >>>>>>> working fine. >>>>>>> >>>>>>> I do not understand what could be the issue here. >>>>>>> >>>>>>> Also note I am using kafka streams version 0.10.0.1 and I am >> publishing >>>>>>> messages via >>>>>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old >>>> 0.5.x >>>>>>> >>>>>>> Let me know if there is some bug in time stamp extractions. >>>>>>> >>>>>>> Thanks >>>>>>> Sachin >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Sachin, >>>>>>>> >>>>>>>> This is indeed a bit wired, and we'd like to try to re-produce your >>>>>> issue >>>>>>>> locally. Do you have a sample input data for us to try out? >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com >>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I fixed that sorted set issue but I am facing a weird problem >> which I >>>>>> am >>>>>>>>> not able to replicate. >>>>>>>>> >>>>>>>>> Here is the sample problem that I could isolate: >>>>>>>>> My class is like this: >>>>>>>>> public static class Message implements Comparable<Message> { >>>>>>>>> public long ts; >>>>>>>>> public String message; >>>>>>>>> public String key; >>>>>>>>> public Message() {}; >>>>>>>>> public Message(long ts, String message, String key) { >>>>>>>>> this.ts = ts; >>>>>>>>> this.key = key; >>>>>>>>> this.message = message; >>>>>>>>> } >>>>>>>>> public int compareTo(Message paramT) { >>>>>>>>> long ts1 = paramT.ts; >>>>>>>>> return ts > ts1 ? 1 : -1; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> pipeline is like this: >>>>>>>>> builder.stream(Serdes.String(), messageSerde, >> "test-window-stream")\ >>>>>>>>> .map(new KeyValueMapper<String, Message, KeyValue<String, >>>> Message>>() >>>>>> { >>>>>>>>> public KeyValue<String, Message> apply(String key, Message >>>> value) >>>>>> { >>>>>>>>> return new KeyValue<String, Message>(value.key, value); >>>>>>>>> } >>>>>>>>> }) >>>>>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream") >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>>>>>>>> public SortedSet<Message> apply() { >>>>>>>>> return new TreeSet<Message>(); >>>>>>>>> } >>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>>>>>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>>>>>> SortedSet<Message> aggregate) { >>>>>>>>> aggregate.add(value); >>>>>>>>> return aggregate; >>>>>>>>> } >>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), >>>>>>>>> Serdes.String(), messagesSerde) >>>>>>>>> .foreach(new ForeachAction<Windowed<String>, >> SortedSet<Message>>() { >>>>>>>>> public void apply(Windowed<String> key, SortedSet<Message> >>>>>> messages) >>>>>>>> { >>>>>>>>> ... >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> >>>>>>>>> So basically I rekey the original message into another topic and >> then >>>>>>>>> aggregate it based on that key. >>>>>>>>> What I have observed is that when I used windowed aggregation the >>>>>>>>> aggregator does not use previous aggregated value. >>>>>>>>> >>>>>>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>>>>>> SortedSet<Message> aggregate) { >>>>>>>>> aggregate.add(value); >>>>>>>>> return aggregate; >>>>>>>>> } >>>>>>>>> >>>>>>>>> So in the above function the aggregate is an empty set of every >> value >>>>>>>>> entering into pipeline. When I remove the windowed aggregation, the >>>>>>>>> aggregate set retains previously aggregated values in the set. >>>>>>>>> >>>>>>>>> I am just not able to wrap my head around it. When I ran this type >> of >>>>>>>> test >>>>>>>>> locally on windows it is working fine. However a similar pipeline >>>> setup >>>>>>>>> when run against production on linux is behaving strangely and >> always >>>>>>>>> getting an empty aggregate set. >>>>>>>>> Any idea what could be the reason, where should I look at the >>>> problem. >>>>>>>> Does >>>>>>>>> length of key string matters here? I will later try to run the same >>>>>>>> simple >>>>>>>>> setup on linux and see what happens. But this is a very strange >>>>>> behavior. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Sachin >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang < >> wangg...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello Sachin, >>>>>>>>>> >>>>>>>>>> In the implementation of SortedSet, if the object's implemented >> the >>>>>>>>>> Comparable interface, that compareTo function is applied in " >>>>>>>>>> aggregate.add(value);", and hence if it returns 0, this element >> will >>>>>>>> not >>>>>>>>> be >>>>>>>>>> added since it is a Set. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal < >> sjmit...@gmail.com >>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> What I find is that when I use sorted set as aggregation it fails >>>> to >>>>>>>>>>> aggregate the values which have compareTo returning 0. >>>>>>>>>>> >>>>>>>>>>> My class is like this: >>>>>>>>>>> public class Message implements Comparable<Message> { >>>>>>>>>>> public long ts; >>>>>>>>>>> public String message; >>>>>>>>>>> public Message() {}; >>>>>>>>>>> public Message(long ts, String message) { >>>>>>>>>>> this.ts = ts; >>>>>>>>>>> this.message = message; >>>>>>>>>>> } >>>>>>>>>>> public int compareTo(Message paramT) { >>>>>>>>>>> long ts1 = paramT.ts; >>>>>>>>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> pipeline is like this: >>>>>>>>>>> builder.stream(Serdes.String(), messageSerde, >>>> "test-window-stream") >>>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>>>>>>>>>> public SortedSet<Message> apply() { >>>>>>>>>>> return new TreeSet<Message>(); >>>>>>>>>>> } >>>>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>>>>>>>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>>>>>>>> SortedSet<Message> aggregate) { >>>>>>>>>>> aggregate.add(value); >>>>>>>>>>> return aggregate; >>>>>>>>>>> } >>>>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * >> 1000L), >>>>>>>>>>> Serdes.String(), messagesSerde) >>>>>>>>>>> .foreach(new ForeachAction<Windowed<String>, >>>> SortedSet<Message>>() { >>>>>>>>>>> public void apply(Windowed<String> key, SortedSet<Message> >>>>>>>>> messages) >>>>>>>>>> { >>>>>>>>>>> ... >>>>>>>>>>> } >>>>>>>>>>> }); >>>>>>>>>>> >>>>>>>>>>> So any message published between 10 and 20 seconds gets >> aggregated >>>> in >>>>>>>>> 10 >>>>>>>>>> - >>>>>>>>>>> 20 bucket and I print the size of the set. >>>>>>>>>>> However output I get is following: >>>>>>>>>>> >>>>>>>>>>> Published: 14 >>>>>>>>>>> Aggregated: 10 20 -> 1 >>>>>>>>>>> >>>>>>>>>>> Published: 18 >>>>>>>>>>> Aggregated: 10 20 -> 2 >>>>>>>>>>> >>>>>>>>>>> Published: 11 >>>>>>>>>>> Aggregated: 10 20 -> 3 >>>>>>>>>>> >>>>>>>>>>> Published: 17 >>>>>>>>>>> Aggregated: 10 20 -> 4 >>>>>>>>>>> >>>>>>>>>>> Published: 14 >>>>>>>>>>> Aggregated: 10 20 -> 4 >>>>>>>>>>> >>>>>>>>>>> Published: 15 >>>>>>>>>>> Aggregated: 10 20 -> 5 >>>>>>>>>>> >>>>>>>>>>> Published: 12 >>>>>>>>>>> Aggregated: key2 10 20 -> 6 >>>>>>>>>>> >>>>>>>>>>> Published: 12 >>>>>>>>>>> Aggregated: 10 20 -> 6 >>>>>>>>>>> >>>>>>>>>>> So if you see any message that occurs again for same second, >> where >>>>>>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline. >>>>>>>>>>> Notice ones published at 14 and 12 seconds. >>>>>>>>>>> >>>>>>>>>>> Now I am not sure if problem is with Java ie I should use >>>> Comparator >>>>>>>>>>> interface and not Comparable for my Message object. Or the >> problem >>>> is >>>>>>>>>> with >>>>>>>>>>> Kafka stream or with serializing and de-serializing the set of >>>>>>>>> messages. >>>>>>>>>> If >>>>>>>>>>> I replace Set with List all is working fine. >>>>>>>>>>> >>>>>>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see >>>> what >>>>>>>>> is >>>>>>>>>>> the best java practice here. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Sachin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll < >>>> mich...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal < >>>> sjmit...@gmail.com >>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I am using kafka_2.10-0.10.0.1. >>>>>>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes. >>>>>>>>>>>>> If the stream app using timestamp extractor puts the message in >>>>>>>> one >>>>>>>>>> or >>>>>>>>>>>> more >>>>>>>>>>>>> bucket(s), it will get aggregated in those buckets. >>>>>>>>>>>>> I assume this statement is correct. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Yes. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Also say when I restart the streams application then bucket >>>>>>>>>> aggregation >>>>>>>>>>>>> will resume from last point of halt. >>>>>>>>>>>>> I hope this is also correct. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Yes. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> What I noticed that once a message is placed in one bucket, >> that >>>>>>>>>> bucket >>>>>>>>>>>> was >>>>>>>>>>>>> not getting new messages. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This should not happen... >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> However when I ran a small test case replicating that, it is >>>>>>>>> working >>>>>>>>>>>>> properly. There maybe some issues in application reset. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ...and apparently it works (as expected) in your small test >> case. >>>>>>>>>>>> >>>>>>>>>>>> Do you have any further information that you could share with us >>>> so >>>>>>>>> we >>>>>>>>>>> can >>>>>>>>>>>> help you better? What's the difference, for example, between >> your >>>>>>>>>>> "normal" >>>>>>>>>>>> use case and the small test case you have been referring to? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Michael >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> > >
signature.asc
Description: OpenPGP digital signature