Hi Everyone, I have a question related to messages auto reprocessing in kafka streams and Transformer/Processor init()/close() methods.
Is possible that in some scenarios (failures, rebalance etc.) a message is processed twice by Transformer.transform() without calling Transformer.init() between the first and the second processing? To illustrate my question with an example. Let's say I have kafka streams application with a default at_least_once semantics setting. The topology is the following - read messages from one topic, apply Transformer and produce messages to another topic. Transformer: new TransformerSupplier() { Transformer get() { return new Transformer() { private ProcessorContext context; void init(ProcessorContext context) { logger.info("init called()); this.context = context; } KeyValue transform(K key, V value) { logger.info("Transform offset: {}, partition: {}, context.offset(), context.partition()); return new KeyValue(key, value); } void close() { logger.info("close() called.") } } } } Is it possible that the app will log "Transform offset: x, partition: y" twice with the same x and y values each time without logging "init() called." between (chronologically)? I tried to simulate some failure scenarios (for example when CommitFailedException is thrown) but in my simple test cases init() was always called between the first and the second reprocessing. Although I could have easily missed something or a scenario where this is not true which is why I'm asking this question here. Sorry, if my question is not clear - please let me know in this case and I'll try to clarify. Thank you for any help you can provide. Regards, Tomasz