> Also note, that the processing order might slightly differ if you process the same data twice ....
Is this still a problem when default partition grouper is used (with 1 partition per task)? Thank you, Stanislav. 2018-03-09 3:19 GMT+01:00 Matthias J. Sax <matth...@confluent.io>: > Thanks for the explanation. > > Not sure if setting the metadata you want to get committed in > punctuation() would be sufficient. But I would think about it in more > details if we get a KIP for this. > > It's correct that flushing and committing offsets is correlated. But > it's not related to punctuation. > > Also note, that the processing order might slightly differ if you > process the same data twice (it depends in which order the brokers > return data on poll() and that it something Streams cannot fully > control). Thus, you code would need to be "robust" against different > processing orders (ie, if there are multiple input partitions, you might > get data first for partition 0 and there for partition 1 or the other > way round -- the order per partitions is guaranteed to be in offset order). > > > -Matthias > > > > On 3/6/18 2:17 AM, Stas Chizhov wrote: > > Thank you, Matthias! > > > > We currently do use kafka consumer and store event time highwatermarks as > > offset metadata. This is used during backup procedure, which is to > create a > > snapshot of the target storage with all events up to certain timestamp > and > > no other. > > > > As for the API - I guess being able to provide partition-to-metadata map > in > > the context commit method would do it (to be called from within punctuate > > method). BTW as far as I understand if Processor API is used flushing > > producers and committing offsets is correlated and both output topic > state > > and committed offsets do correspond to a state at the moment of some > > punctuation. Meaning that if I do have a deterministic processing > topology > > my output topic is going to be deterministic as well (modulo duplicates > of > > course). Am I correct here? > > > > Best regards, > > Stanislav. > > > > > > 2018-03-05 2:31 GMT+01:00 Matthias J. Sax <matth...@confluent.io>: > > > >> You are correct. This is not possible atm. > >> > >> Note, that commits happen "under the hood" and users cannot commit > >> explicitly. Users can only "request" as commit -- this implies that > >> Kafka Streams will commit as soon as possible -- but when > >> `context#commit()` returns, the commit is not done yet (it only sets a > >> flag). > >> > >> What is your use case for this? How would you want to use this from an > >> API point of view? > >> > >> Feel free to open a feature request JIRA -- we don't have any plans to > >> add this atm -- it's the first time anybody asks for this feature. If > >> there is a JIRA, maybe somebody picks it up :) > >> > >> > >> -Matthias > >> > >> On 3/3/18 6:51 AM, Stas Chizhov wrote: > >>> Hi, > >>> > >>> There seems to be no way to commit custom metadata along with offsets > >> from > >>> within Kafka Streams. > >>> Are there any plans to expose this functionality or have I missed > >> something? > >>> > >>> Best regards, > >>> Stanislav. > >>> > >> > >> > > > >