Hi group, We've encountered a problem during regular operation of the kafka-streams application. While processing a record we received the following error. There's very little documentation on this kind of problem but I've gathered that it's a synchronization issue between kafka consumer and producer. Have you ever encountered it before?
My questions are: is this a matter of misconfiguration, or rather a bug? Has it been fixed? We're using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1 against the kafka broker version 2.1.1. We are running it as an OSGI bundle with dependencies packaged within the bundle. Thank you. 2022-10-18T01:20:19,665 | ERROR | TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1 | TaskManager | 218 - org.apache.servicemix.bundles.kafka-clients - 2.8.1.1 | stream-thread [TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1] Failed to process stream task 0_2 due to the following error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_2, processor=KSTREAM-SOURCE-0000000002, topic=ORDER_BOOK_BEST, partition=2, offset=13452067, stacktrace=java.lang.IllegalStateException: Cannot call send in state COMMITTING_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758) ~[!/:?] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) [!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) [!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) [!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) [!/:?] Caused by: java.lang.IllegalStateException: Cannot call send in state COMMITTING_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451) ~[!/:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948) ~[!/:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) ~[!/:?] at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211) ~[!/:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182) ~[!/:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139) ~[!/:?] at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[!/:?] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[!/:?] at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[!/:?] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[!/:?] at org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[!/:?] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[!/:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[!/:?] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) ~[!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[!/:?] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[!/:?] ... 4 more