Hi group,

We've encountered a problem during regular operation of the kafka-streams
application. While processing a record we received the following error.
There's very little documentation on this kind of problem but I've gathered
that it's a synchronization issue between kafka consumer and producer. Have
you ever encountered it before?

My questions are: is this a matter of misconfiguration, or rather a bug?
Has it been fixed?

We're using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
against the kafka broker version 2.1.1. We are running it as an OSGI bundle
with dependencies packaged within the bundle.

Thank you.

2022-10-18T01:20:19,665 | ERROR |
TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1 |
TaskManager | 218 - org.apache.servicemix.bundles.kafka-clients - 2.8.1.1 |
stream-thread
[TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1] Failed
to process stream task 0_2 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_2, processor=KSTREAM-SOURCE-0000000002,
topic=ORDER_BOOK_BEST, partition=2, offset=13452067,
stacktrace=java.lang.IllegalStateException: Cannot call send in state
COMMITTING_TRANSACTION at
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
at
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
~[!/:?] at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
[!/:?] at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
[!/:?] at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
[!/:?] at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
[!/:?] Caused by: java.lang.IllegalStateException: Cannot call send in
state COMMITTING_TRANSACTION at
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
~[!/:?] at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
~[!/:?] at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
~[!/:?] at
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
~[!/:?] at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
~[!/:?] at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
~[!/:?] at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
~[!/:?] at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
~[!/:?] at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
~[!/:?] at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
~[!/:?] at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
~[!/:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[!/:?] at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
~[!/:?] ... 4 more

Reply via email to