Hi Hamza,

You can create a windowed store in the processor API via the Stores factory
class: org.apache.kafka.streams.state.Stores

More specifically, you you do sth. like:

Stores.create().withKeys().withValues().persistent().windowed(/* you can
specify window size, retention period etc here */)


Which returns the RocksDBWindowStoreSupplier.

Guozhang


On Mon, Oct 24, 2016 at 2:23 AM, Hamza HACHANI <hamza.hach...@supcom.tn>
wrote:

> And the start time and end time of the window.
>
> In other words i need the notion of windows in the proecessor API.
>
> Is this possible ?
>
> ________________________________
> De : Hamza HACHANI <hamza.hach...@supcom.tn>
> Envoyé : dimanche 23 octobre 2016 20:43:05
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> To be more specific.
>
> What id do really need is the property of the retention time dor the
> window in the processor API.
>
> Because for the window  i think that i can manage to do this.
>
>
> Hamza
>
> ________________________________
> De : Hamza HACHANI <hamza.hach...@supcom.tn>
> Envoyé : dimanche 23 octobre 2016 20:30:13
> À : users@kafka.apache.org
> Objet : RE: customised event time
>
> Hi,
>
> I think that maybe i'm asking much.
>
> But Ineed the aspect of windowing in the processor API not in the Stram
> DSL. Is this possible?
>
> The second question is how can i get rid of the intermediate results
> because i'm only interested in the final result given by the window.
>
> Hamza
>
> ________________________________
> De : Matthias J. Sax <matth...@confluent.io>
> Envoyé : samedi 22 octobre 2016 16:12:45
> À : users@kafka.apache.org
> Objet : Re: customised event time
>
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> you can set window retention time via method Windows#until() (and this
> retention time is based on the timestamps returned from you custom
> timestamp extractor). This keeps all windows until the retention time
> passes and thus, all later arrival records will be processed correctly.
>
> However, Kafka Streams does not close windows as other framework, but
> rather gives you an (intermediate) result each time a window is
> updated with a new record (regardless if the record is in-order or
> late -- you will get a result record in both cases).
>
> As of Kafka 0.10.1 those (intermediate) results get deduplicated so
> you might not receive all (intermediate) results downstream. Of
> course, it is ensured, that you will eventually get the latest/final
> result sent downstream.
>
>
> - -Matthias
>
> On 10/21/16 7:42 AM, Hamza HACHANI wrote:
> > Hi,
> >
> >
> > I would like to process data based on a customised event time.(a
> > timestamp that I implement as part of the message).
> >
> > The data is processed in periodic windows of x time that are
> > parametered via the method punctuate.
> >
> > What I need is a retention time for the window to be able to treat
> > the late arriving messages.
> >
> > Can I do this : define/configure a retention time for windows . For
> > example the window which treat data between 15pm and 16pm forward
> > the result not in 16pm but in 16:15 pm.
> >
> > Thanks in advance for your help.
> >
> >
> > Hamza
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYDDi9AAoJECnhiMLycopPhR8P+wVnRiRwuM7omj3l4EfBVP9Q
> 7Vflwzf9TFmCjmtpSkezQVNSOhU7maUEB7R/4rjqacLuALJBPrOZ+mrRfYIx/oJd
> E7oc3JXO1dlANMyKQ8SQ9dkXWxkeyNtl9ujkcFhMBejqtamteVaI/iNdUdbs6CCk
> AZG86yQKhhPL2Q+BZK0eu88bc0yB5Kwj86uCFFkzMaMXVkvCY1f/kkplRGADsSfd
> dfv5LONFzXmJ5dftC1giMis4Pg/kJU9gwvmh+WJB2tSM8KjXf1hZWe4nv1f+af8g
> pVLnyepv7i7I6PwX/wSdtY2uSLKjizLSbvR3JsfAqLWSVBMgnPpHjCvGecR8K3IC
> FwsnWUk9QJKn4EoahsfRq+afUYva8HRxNH7Aiq+gXvEx9LPgwCqbOGYy4ZnOaoi5
> PycddoNaK1QEn3WYSxN88kLWFx5+vem4LWiBSDBT+Px0p14ZgKwEjqNgyj4fxBtw
> SRsrxVvXXcj2qWvjtRCmTYpn7hanAbYT0QoNfIPs20/Jrh3qsaVJIt5xzRuYyxC8
> aobYLlRFLL82eYaPcdQNz/1T0Q7kT9NbEabiuADaZIz7X1yQluMWuTaGIDk/6j6v
> A2OqaJd8+KkMMOy9PUX1rBnJhJPZ4CTvkzmjnY10HU7sneNpBvU0tfnPdBBCU15B
> 8bsUiTf0lk6t9amGYT6q
> =PcW7
> -----END PGP SIGNATURE-----
>



-- 
-- Guozhang

Reply via email to