If you're talking about which store to use in your transform function, it should be a windowed store.
You can create such a store with the `Stores` factory, and suppose your old code has `windowedBy(TimeWindows.of(60000))`, then you can do ` windows = TimeWindows.of(60000); Stores.WindowStoreBuilder( Stores.persistentWindowStore("Counts"), windows.maintainMs(), windows.segments, windows.size(), true) ) ` Guozhang On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <funk...@live.com> wrote: > I return back . > Which StateStore could I use for this problem? > and another idea .I can send 'flush' message into this topic . > when received this message could update results to db. > I don't know it's work? > > ________________________________ > funk...@live.com > > From: Guozhang Wang<mailto:wangg...@gmail.com> > Date: 2018-03-12 03:58 > To: users<mailto:users@kafka.apache.org> > Subject: Re: Re: kafka steams with TimeWindows ,incorrect result > If you want to strictly "only have one output per window", then for now > you'd probably implement that logic using a lower-level "transform" > function in which you can schedule a punctuate function to send all the > results at the end of a window. > > If you just want to reduce the amount of data to your sink, but your sink > can still handle overwritten records of the same key, you can enlarge the > cache size via the cache.max.bytes.buffering config. > > https://kafka.apache.org/documentation/#streamsconfigs > > On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funk...@live.com> wrote: > > > thx for your reply! > > I see that it is designed to operate on an infinite, unbounded stream of > > data. > > now I want to process for unbounded stream but divided by time interval > . > > so what can I do for doing this ? > > > > ________________________________ > > funk...@live.com > > > > From: Guozhang Wang<mailto:wangg...@gmail.com> > > Date: 2018-03-10 02:50 > > To: users<mailto:users@kafka.apache.org> > > Subject: Re: kafka steams with TimeWindows ,incorrect result > > Hi Jie, > > > > This is by design of Kafka Streams, please read this doc for more details > > (search for "outputs of the Wordcount application is actually a > continuous > > stream of updates"): > > > > https://kafka.apache.org/0110/documentation/streams/quickstart > > > > Note this semantics applies for both windowed and un-windowed tables. > > > > > > Guozhang > > > > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote: > > > > > Hi: > > > I used TimeWindow for aggregate data in kafka. > > > > > > this is code snippet ; > > > > > > view.flatMap(new MultipleKeyValueMapper(client) > > > ).groupByKey(Serialized.with(Serdes.String(), > > > Serdes.serdeFrom(new CountInfoSerializer(), new > > > CountInfoDeserializer()))) > > > .windowedBy(TimeWindows.of(60000)).reduce(new > > > Reducer<CountInfo>() { > > > @Override > > > public CountInfo apply(CountInfo value1, CountInfo value2) > { > > > return new CountInfo(value1.start + value2.start, > > > value1.active + value2.active, value1.fresh + value2.fresh); > > > } > > > }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo, > > > String>() { > > > @Override > > > public String apply(Windowed<String> key, CountInfo value) > { > > > return key.key(); > > > } > > > }).print(Printed.toSysOut()); > > > > > > KafkaStreams streams = new KafkaStreams(builder.build(), > > > KStreamReducer.getConf()); > > > streams.start(); > > > > > > and I test 30000 data in kafka . > > > and I print key value . > > > > > > > > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_ > > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179, > > > fresh=12179} > > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@ > > 1520601300000/1520601360000], > > > CountInfo{start=12179, active=12179, fresh=12179} > > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_ > > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000, > > > fresh=30000} > > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@ > > 1520601300000/1520601360000], > > > CountInfo{start=30000, active=30000, fresh=30000} > > > why in one window duration will be print two result but not one result > ? > > > > > > ________________________________ > > > funk...@live.com > > > > > > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang > -- -- Guozhang