Interesting, this does look like it could be a bug in Streams and I'm not aware of any known or already-fixed issues resembling this. Could you file a bug report over here <https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA> and include as much context/information as possible? Providing logs from around the time leading up to this exception in particular would greatly help in debugging this
On Tue, Nov 15, 2022 at 2:15 AM Tomasz Gac <tomasz....@empirica.io.invalid> wrote: > Hi group, > > We've encountered a problem during regular operation of the kafka-streams > application. While processing a record we received the following error. > There's very little documentation on this kind of problem but I've gathered > that it's a synchronization issue between kafka consumer and producer. Have > you ever encountered it before? > > My questions are: is this a matter of misconfiguration, or rather a bug? > Has it been fixed? > > We're using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1 > against the kafka broker version 2.1.1. We are running it as an OSGI bundle > with dependencies packaged within the bundle. > > Thank you. > > 2022-10-18T01:20:19,665 | ERROR | > TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1 | > TaskManager | 218 - org.apache.servicemix.bundles.kafka-clients - 2.8.1.1 | > stream-thread > [TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1] Failed > to process stream task 0_2 due to the following error: > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_2, processor=KSTREAM-SOURCE-0000000002, > topic=ORDER_BOOK_BEST, partition=2, offset=13452067, > stacktrace=java.lang.IllegalStateException: Cannot call send in state > COMMITTING_TRANSACTION at > > org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451) > at > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948) > at > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > at > > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211) > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182) > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139) > at > > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42) > at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > at > > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) > at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) > at > > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) > at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) > [!/:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > [!/:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) > [!/:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) > [!/:?] Caused by: java.lang.IllegalStateException: Cannot call send in > state COMMITTING_TRANSACTION at > > org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451) > ~[!/:?] at > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948) > ~[!/:?] at > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > ~[!/:?] at > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > ~[!/:?] at > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > ~[!/:?] at > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > ~[!/:?] at > > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > ~[!/:?] at > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > ~[!/:?] at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) > ~[!/:?] ... 4 more >