Hi John, Thank you for your quick and thorough reply, it's really helpful for me!
Regards, Tomasz Zorawik ------- Original Message ------- sobota, 8 października 2022 1:28 AM, John Roesler <vvcep...@apache.org> napisał(a): > Hello Tomasz, > > Thanks for the question! > > Streams should always call init() before passing any records to > transform(...). > > When we talk about "reprocessing", we just mean that some record was > processed, but then there was a failure before its offset was committed, and > therefore we have to process it again after recovery. There's no difference > in code paths between the first and second time the record is processed. In > all cases, Streams will initialize the Transformer (and other Processors) > before using them. > > I think the last part of the answer is that, if we wind up observing the same > record twice in transform(...), it must be because there was a failure and > then a recovery, in which case, we will close and re-create the processor > (and call init() ). > > One thing to be aware of, though, is that (under at-least-once), Streams does > not guarantee to clear out any dirty state from a state store. Therefore, > even though we re-initialize the processors (including Transformers), if > you're using a persistent state store, we'll just re-open it. One of the key > things that exactly-once mode does differently is to clear out all dirty > writes from the state store during failure recovery. > > I hope this helps! > > Thanks, > John > > On Fri, Oct 7, 2022, at 15:57, xardaso wrote: > > > Hi Everyone, > > > > I have a question related to messages auto reprocessing in kafka > > streams and Transformer/Processor init()/close() methods. > > > > Is possible that in some scenarios (failures, rebalance etc.) a message > > is processed twice by Transformer.transform() without calling > > Transformer.init() between the first and the second processing? > > > > To illustrate my question with an example. Let's say I have kafka > > streams application with a default at_least_once semantics setting. The > > topology is the following - read messages from one topic, apply > > Transformer and produce messages to another topic. Transformer: > > > > new TransformerSupplier() { > > Transformer get() { > > return new Transformer() { > > private ProcessorContext context; > > > > void init(ProcessorContext context) { > > logger.info("init called()); > > > > this.context = context; > > } > > > > KeyValue transform(K key, V value) { > > logger.info("Transform offset: {}, partition: {}, > > context.offset(), context.partition()); > > return new KeyValue(key, value); > > } > > > > void close() { > > logger.info("close() called.") > > } > > } > > } > > } > > > > Is it possible that the app will log "Transform offset: x, partition: > > y" twice with the same x and y values each time without logging "init() > > called." between (chronologically)? > > > > I tried to simulate some failure scenarios (for example when > > CommitFailedException is thrown) but in my simple test cases init() was > > always called between the first and the second reprocessing. Although I > > could have easily missed something or a scenario where this is not true > > which is why I'm asking this question here. > > > > Sorry, if my question is not clear - please let me know in this case > > and I'll try to clarify. > > > > Thank you for any help you can provide. > > > > Regards, > > Tomasz