and I checked windowStore interface found it has put method not get method. in one second the stream had sample key and different value in it. and I must update key value which store in it.
________________________________ funk...@live.com From: funk...@live.com<mailto:funk...@live.com> Date: 2018-04-27 16:08 To: users<mailto:users@kafka.apache.org> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result Hi: I don't kown what to do with transform function. and stream is preapred well like this at blew key: 44_14_2018-04-27 value: CountInfo(start=1,active=0,fresh =0) there is amount data like that。 how I aggregate it with peer 1 seconds using transform function? ________________________________ funk...@live.com From: Guozhang Wang<mailto:wangg...@gmail.com> Date: 2018-04-27 01:50 To: users<mailto:users@kafka.apache.org> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result Using a control message to flush results to downstream (in your case to the result db) looks good to me as well. On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wangg...@gmail.com> wrote: > If you're talking about which store to use in your transform function, it > should be a windowed store. > > You can create such a store with the `Stores` factory, and suppose your > old code has `windowedBy(TimeWindows.of(60000))`, then you can do > > ` > windows = TimeWindows.of(60000); > > Stores.WindowStoreBuilder( > Stores.persistentWindowStore("Counts"), > windows.maintainMs(), > > windows.segments, > > windows.size(), > true) > > ) > > ` > > > Guozhang > > > > On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <funk...@live.com> wrote: > >> I return back . >> Which StateStore could I use for this problem? >> and another idea .I can send 'flush' message into this topic . >> when received this message could update results to db. >> I don't know it's work? >> >> ________________________________ >> funk...@live.com >> >> From: Guozhang Wang<mailto:wangg...@gmail.com> >> Date: 2018-03-12 03:58 >> To: users<mailto:users@kafka.apache.org> >> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result >> If you want to strictly "only have one output per window", then for now >> you'd probably implement that logic using a lower-level "transform" >> function in which you can schedule a punctuate function to send all the >> results at the end of a window. >> >> If you just want to reduce the amount of data to your sink, but your sink >> can still handle overwritten records of the same key, you can enlarge the >> cache size via the cache.max.bytes.buffering config. >> >> https://kafka.apache.org/documentation/#streamsconfigs >> >> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funk...@live.com> wrote: >> >> > thx for your reply! >> > I see that it is designed to operate on an infinite, unbounded stream of >> > data. >> > now I want to process for unbounded stream but divided by time >> interval . >> > so what can I do for doing this ? >> > >> > ________________________________ >> > funk...@live.com >> > >> > From: Guozhang Wang<mailto:wangg...@gmail.com> >> > Date: 2018-03-10 02:50 >> > To: users<mailto:users@kafka.apache.org> >> > Subject: Re: kafka steams with TimeWindows ,incorrect result >> > Hi Jie, >> > >> > This is by design of Kafka Streams, please read this doc for more >> details >> > (search for "outputs of the Wordcount application is actually a >> continuous >> > stream of updates"): >> > >> > https://kafka.apache.org/0110/documentation/streams/quickstart >> > >> > Note this semantics applies for both windowed and un-windowed tables. >> > >> > >> > Guozhang >> > >> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote: >> > >> > > Hi: >> > > I used TimeWindow for aggregate data in kafka. >> > > >> > > this is code snippet ; >> > > >> > > view.flatMap(new MultipleKeyValueMapper(client) >> > > ).groupByKey(Serialized.with(Serdes.String(), >> > > Serdes.serdeFrom(new CountInfoSerializer(), new >> > > CountInfoDeserializer()))) >> > > .windowedBy(TimeWindows.of(60000)).reduce(new >> > > Reducer<CountInfo>() { >> > > @Override >> > > public CountInfo apply(CountInfo value1, CountInfo >> value2) { >> > > return new CountInfo(value1.start + value2.start, >> > > value1.active + value2.active, value1.fresh + value2.fresh); >> > > } >> > > }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo, >> > > String>() { >> > > @Override >> > > public String apply(Windowed<String> key, CountInfo >> value) { >> > > return key.key(); >> > > } >> > > }).print(Printed.toSysOut()); >> > > >> > > KafkaStreams streams = new KafkaStreams(builder.build(), >> > > KStreamReducer.getConf()); >> > > streams.start(); >> > > >> > > and I test 30000 data in kafka . >> > > and I print key value . >> > > >> > > >> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_ >> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179, >> > > fresh=12179} >> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@ >> > 1520601300000/1520601360000], >> > > CountInfo{start=12179, active=12179, fresh=12179} >> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_ >> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000, >> > > fresh=30000} >> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@ >> > 1520601300000/1520601360000], >> > > CountInfo{start=30000, active=30000, fresh=30000} >> > > why in one window duration will be print two result but not one >> result ? >> > > >> > > ________________________________ >> > > funk...@live.com >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> >> >> >> -- >> -- Guozhang >> > > > > -- > -- Guozhang > -- -- Guozhang