Hi Hamza, You can create a windowed store in the processor API via the Stores factory class: org.apache.kafka.streams.state.Stores
More specifically, you you do sth. like: Stores.create().withKeys().withValues().persistent().windowed(/* you can specify window size, retention period etc here */) Which returns the RocksDBWindowStoreSupplier. Guozhang On Mon, Oct 24, 2016 at 2:23 AM, Hamza HACHANI <hamza.hach...@supcom.tn> wrote: > And the start time and end time of the window. > > In other words i need the notion of windows in the proecessor API. > > Is this possible ? > > ________________________________ > De : Hamza HACHANI <hamza.hach...@supcom.tn> > Envoyé : dimanche 23 octobre 2016 20:43:05 > À : users@kafka.apache.org > Objet : RE: customised event time > > To be more specific. > > What id do really need is the property of the retention time dor the > window in the processor API. > > Because for the window i think that i can manage to do this. > > > Hamza > > ________________________________ > De : Hamza HACHANI <hamza.hach...@supcom.tn> > Envoyé : dimanche 23 octobre 2016 20:30:13 > À : users@kafka.apache.org > Objet : RE: customised event time > > Hi, > > I think that maybe i'm asking much. > > But Ineed the aspect of windowing in the processor API not in the Stram > DSL. Is this possible? > > The second question is how can i get rid of the intermediate results > because i'm only interested in the final result given by the window. > > Hamza > > ________________________________ > De : Matthias J. Sax <matth...@confluent.io> > Envoyé : samedi 22 octobre 2016 16:12:45 > À : users@kafka.apache.org > Objet : Re: customised event time > > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Hi, > > you can set window retention time via method Windows#until() (and this > retention time is based on the timestamps returned from you custom > timestamp extractor). This keeps all windows until the retention time > passes and thus, all later arrival records will be processed correctly. > > However, Kafka Streams does not close windows as other framework, but > rather gives you an (intermediate) result each time a window is > updated with a new record (regardless if the record is in-order or > late -- you will get a result record in both cases). > > As of Kafka 0.10.1 those (intermediate) results get deduplicated so > you might not receive all (intermediate) results downstream. Of > course, it is ensured, that you will eventually get the latest/final > result sent downstream. > > > - -Matthias > > On 10/21/16 7:42 AM, Hamza HACHANI wrote: > > Hi, > > > > > > I would like to process data based on a customised event time.(a > > timestamp that I implement as part of the message). > > > > The data is processed in periodic windows of x time that are > > parametered via the method punctuate. > > > > What I need is a retention time for the window to be able to treat > > the late arriving messages. > > > > Can I do this : define/configure a retention time for windows . For > > example the window which treat data between 15pm and 16pm forward > > the result not in 16pm but in 16:15 pm. > > > > Thanks in advance for your help. > > > > > > Hamza > > > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q > 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd > E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk > AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd > dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g > pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC > FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5 > PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw > SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8 > aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v > A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B > 8bsUiTf0lk6t9amGYT6q > =PcW7 > -----END PGP SIGNATURE----- > -- -- Guozhang