Hi John,

Thank you for your quick and thorough reply, it's really helpful for me!

Regards,
Tomasz Zorawik 

------- Original Message -------
sobota, 8 października 2022 1:28 AM, John Roesler <vvcep...@apache.org> 
napisał(a):


> Hello Tomasz,
> 
> Thanks for the question!
> 
> Streams should always call init() before passing any records to 
> transform(...).
> 
> When we talk about "reprocessing", we just mean that some record was 
> processed, but then there was a failure before its offset was committed, and 
> therefore we have to process it again after recovery. There's no difference 
> in code paths between the first and second time the record is processed. In 
> all cases, Streams will initialize the Transformer (and other Processors) 
> before using them.
> 
> I think the last part of the answer is that, if we wind up observing the same 
> record twice in transform(...), it must be because there was a failure and 
> then a recovery, in which case, we will close and re-create the processor 
> (and call init() ).
> 
> One thing to be aware of, though, is that (under at-least-once), Streams does 
> not guarantee to clear out any dirty state from a state store. Therefore, 
> even though we re-initialize the processors (including Transformers), if 
> you're using a persistent state store, we'll just re-open it. One of the key 
> things that exactly-once mode does differently is to clear out all dirty 
> writes from the state store during failure recovery.
> 
> I hope this helps!
> 
> Thanks,
> John
> 
> On Fri, Oct 7, 2022, at 15:57, xardaso wrote:
> 
> > Hi Everyone,
> > 
> > I have a question related to messages auto reprocessing in kafka
> > streams and Transformer/Processor init()/close() methods.
> > 
> > Is possible that in some scenarios (failures, rebalance etc.) a message
> > is processed twice by Transformer.transform() without calling
> > Transformer.init() between the first and the second processing?
> > 
> > To illustrate my question with an example. Let's say I have kafka
> > streams application with a default at_least_once semantics setting. The
> > topology is the following - read messages from one topic, apply
> > Transformer and produce messages to another topic. Transformer:
> > 
> > new TransformerSupplier() {
> > Transformer get() {
> > return new Transformer() {
> > private ProcessorContext context;
> > 
> > void init(ProcessorContext context) {
> > logger.info("init called());
> > 
> > this.context = context;
> > }
> > 
> > KeyValue transform(K key, V value) {
> > logger.info("Transform offset: {}, partition: {},
> > context.offset(), context.partition());
> > return new KeyValue(key, value);
> > }
> > 
> > void close() {
> > logger.info("close() called.")
> > }
> > }
> > }
> > }
> > 
> > Is it possible that the app will log "Transform offset: x, partition:
> > y" twice with the same x and y values each time without logging "init()
> > called." between (chronologically)?
> > 
> > I tried to simulate some failure scenarios (for example when
> > CommitFailedException is thrown) but in my simple test cases init() was
> > always called between the first and the second reprocessing. Although I
> > could have easily missed something or a scenario where this is not true
> > which is why I'm asking this question here.
> > 
> > Sorry, if my question is not clear - please let me know in this case
> > and I'll try to clarify.
> > 
> > Thank you for any help you can provide.
> > 
> > Regards,
> > Tomasz

Reply via email to