Hi Everyone,

I have a question related to messages auto reprocessing in kafka streams and 
Transformer/Processor init()/close() methods.

Is possible that in some scenarios (failures, rebalance etc.) a message is 
processed twice by Transformer.transform() without calling Transformer.init() 
between the first and the second processing?

To illustrate my question with an example. Let's say I have kafka streams 
application with a default at_least_once semantics setting. The topology is the 
following - read messages from one topic, apply Transformer and produce 
messages to another topic. Transformer:

new TransformerSupplier() {
     Transformer get() {
         return new Transformer() {
             private ProcessorContext context;

             void init(ProcessorContext context) {
                 logger.info("init called());

this.context = context;

             KeyValue transform(K key, V value) {
                 logger.info("Transform offset: {}, partition: {}, 
context.offset(), context.partition());
                 return new KeyValue(key, value);

             void close() {
                 logger.info("close() called.")

Is it possible that the app will log "Transform offset: x, partition: y" twice 
with the same x and y values each time without logging "init() called." between 

I tried to simulate some failure scenarios (for example when 
CommitFailedException is thrown) but in my simple test cases init() was always 
called between the first and the second reprocessing. Although I could have 
easily missed something or a scenario where this is not true which is why I'm 
asking this question here.

Sorry, if my question is not clear - please let me know in this case and I'll 
try to clarify.

Thank you for any help you can provide.


