> Also note, that the processing order might slightly differ if you
process the same data twice ....

Is this still a problem when default partition grouper is used (with 1
partition per task)?

Thank you,
Stanislav.



2018-03-09 3:19 GMT+01:00 Matthias J. Sax <matth...@confluent.io>:

> Thanks for the explanation.
>
> Not sure if setting the metadata you want to get committed in
> punctuation() would be sufficient. But I would think about it in more
> details if we get a KIP for this.
>
> It's correct that flushing and committing offsets is correlated. But
> it's not related to punctuation.
>
> Also note, that the processing order might slightly differ if you
> process the same data twice (it depends in which order the brokers
> return data on poll() and that it something Streams cannot fully
> control). Thus, you code would need to be "robust" against different
> processing orders (ie, if there are multiple input partitions, you might
> get data first for partition 0 and there for partition 1 or the other
> way round -- the order per partitions is guaranteed to be in offset order).
>
>
> -Matthias
>
>
>
> On 3/6/18 2:17 AM, Stas Chizhov wrote:
> > Thank you, Matthias!
> >
> > We currently do use kafka consumer and store event time highwatermarks as
> > offset metadata. This is used during backup procedure, which is to
> create a
> > snapshot of the target storage with all events up to certain timestamp
> and
> > no other.
> >
> > As for the API - I guess being able to provide partition-to-metadata map
> in
> > the context commit method would do it (to be called from within punctuate
> > method). BTW as far as I understand if Processor API is used flushing
> > producers and committing offsets is correlated and both output topic
> state
> > and committed offsets do correspond to a state at the moment of some
> > punctuation. Meaning that if I do have a deterministic processing
> topology
> > my output topic is going to be deterministic as well (modulo duplicates
> of
> > course).  Am I correct here?
> >
> > Best regards,
> > Stanislav.
> >
> >
> > 2018-03-05 2:31 GMT+01:00 Matthias J. Sax <matth...@confluent.io>:
> >
> >> You are correct. This is not possible atm.
> >>
> >> Note, that commits happen "under the hood" and users cannot commit
> >> explicitly. Users can only "request" as commit -- this implies that
> >> Kafka Streams will commit as soon as possible -- but when
> >> `context#commit()` returns, the commit is not done yet (it only sets a
> >> flag).
> >>
> >> What is your use case for this? How would you want to use this from an
> >> API point of view?
> >>
> >> Feel free to open a feature request JIRA -- we don't have any plans to
> >> add this atm -- it's the first time anybody asks for this feature. If
> >> there is a JIRA, maybe somebody picks it up :)
> >>
> >>
> >> -Matthias
> >>
> >> On 3/3/18 6:51 AM, Stas Chizhov wrote:
> >>> Hi,
> >>>
> >>> There seems to be no way to commit custom metadata along with offsets
> >> from
> >>> within Kafka Streams.
> >>> Are there any plans to expose this functionality or have I missed
> >> something?
> >>>
> >>> Best regards,
> >>> Stanislav.
> >>>
> >>
> >>
> >
>
>

Reply via email to