Thanks so much for replying.
I tried turning on DEBUG logging and tracking down transaction-specific details.
For about a minute and a half, there's a stream of logs like:
[2025-11-13 00:13:50,355] DEBUG TransactionalId
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 prepare
transition from EMPTY to TxnTransitMetadata[producerId=380008,
prevProducerId=380008, nextProducerId=-1, producerEpoch=0,
lastProducerEpoch=-1, txnTimeoutMs=10000, txnState=EMPTY, topicPartitions=[],
txnStartTimestamp=-1, txnLastUpdateTimestamp=1762992830073,
clientTransactionVersion=TV_0]
(kafka.coordinator.transaction.TransactionMetadata)
INFO [TransactionCoordinator id=102] Initialized transactionalId
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 with
producerId 380008 and producer epoch 0 on partition __transaction_state-34
(kafka.coordinator.transaction.TransactionCoordinator)
[2025-11-13 00:15:12,222] DEBUG [Transaction State Manager 102]: Updating
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1's
transaction state to TxnTransitMetadata[producerId=380008,
prevProducerId=380008, nextProducerId=-1, producerEpoch=1, lastProducerEpoch=0,
txnTimeoutMs=10000, txnState=ONGOING,
topicPartitions=[search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-21,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-19,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-15,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-27,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-26,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-25,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-29,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-24,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-23,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-3,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-7,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-12,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-9,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-14],
txnStartTimestamp=1762992902983, txnLastUpdateTimestamp=1762992911740,
clientTransactionVersion=TV_2] with coordinator epoch 13 for
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 succeeded
(kafka.coordinator.transaction.TransactionStateManager)
(lots of repeated ADD_PARTITIONS)
[2025-11-13 00:15:20,102] DEBUG [Transaction State Manager 102]: Updating
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1's
transaction state to TxnTransitMetadata[producerId=380008,
prevProducerId=380008, nextProducerId=-1, producerEpoch=2, lastProducerEpoch=1,
txnTimeoutMs=10000, txnState=PREPARE_ABORT,
topicPartitions=[search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-22,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-16,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-21,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-19,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-15,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-29,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-28,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-27,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-26,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-25,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-29,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-24,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-23,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-3,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-0,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-7,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-13,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-10,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-12,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-9,
search-indexing-qa-scs-tmp-150-grantsIn-table-repartition-8,
search-indexing-qa-scs-tmp-150-indexable-user-table-repartition-14],
txnStartTimestamp=1762992902983, txnLastUpdateTimestamp=1762992918599,
clientTransactionVersion=TV_2] with coordinator epoch 13 for
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 succeeded
(kafka.coordinator.transaction.TransactionStateManager)
[2025-11-13 00:15:20,102] DEBUG TransactionalId
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 prepare
transition from PREPARE_ABORT to TxnTransitMetadata[producerId=380008,
prevProducerId=380008, nextProducerId=-1, producerEpoch=2, lastProducerEpoch=1,
txnTimeoutMs=10000, txnState=COMPLETE_ABORT, topicPartitions=[],
txnStartTimestamp=1762992902983, txnLastUpdateTimestamp=1762992920102,
clientTransactionVersion=TV_2]
(kafka.coordinator.transaction.TransactionMetadata)
[2025-11-13 00:15:20,121] INFO [TransactionCoordinator id=102] Completed
rollback of ongoing transaction for transactionalId
search-indexing-qa-scs-tmp-150-1ad3ea89-ebbf-4bec-af13-e8e664c41a9b-1 due to
timeout (kafka.coordinator.transaction.TransactionCoordinator)
So, it seems all the transactions end up getting aborted due to timeout. I'm a little
surprised to see "txnTimeoutMs=10000" - the documented default is 60000, and we
didn't reduce it. But the overall actual timeout seems closer to 90s.
I'm not sure why our brokers would be struggling to commit transactions so
badly. In at least once mode, we can push 50MB/s+ pretty easily, but in exactly
once mode we see only a few KB/s actually getting through.
We tried upgrading the broker disks to a higher class (SSD backed) but that
didn't seem to change much, the disks remain mostly idle as transactions
timeout left and right. Network processor threads metric remains mostly idle as
well.
I tried attaching a profiler to the Kafka broker and see if I could identify any place it
gets "stuck", but unfortunately perhaps due to the asynchronous nature of the
system, stack trace profiling wasn't particularly helpful.
I'll keep playing with it... I've attached a broker log dump filtered to this
single transaction, in case there's anything helpful there.
Best,
Steven
On Nov 8, 2025, at 10:18 AM, Matthias J. Sax <[email protected]> wrote:
Hello Steven,
Hard to say.
To give some background around "concurrent TX" error. When a client calls
commitTransaction(), a 2-phase commit is executed, and only the first phase is sync, ie,
commitTransaction() blocks only until the first phase is completed (ie, after the commit
itself was successful). However, after a TX is committed, it is no completed yet, and the
completion of the TX happens in the second phase which happens async on the broker.
As long as the second phase is not completed, the same client cannot start a new TX, and
if it tried to do so, the broker returns the "concurrent TX" error you observe.
If the broker is healthy and completes the second phase quickly, the error
should be rare and transient; as you have noted, it's a retriable error. The
client just needs to wait until the broker completed the TX before it can
resume.
Given the issue you describe, it seems the broker is slow (or not able at all)
to complete a committed transaction. Why this is the case is hard to say from
the information you provided, but I would inspect broker logs in more details
(especially the broker hosting the corresponding TX coordinator) to find out
more. Maybe collecting DEBUG logs could also help.
The only suspicious config I found is `replication.factor`, which you set to 2.
For EOS, the recommended configuration is a replication factor of 3, plus
min-in-sync-replicas of 2. Not sure what you current min-in-sync-replica config
is. It would also be good to verify the corresponding configs of the broke side
internal __transaction_state topic.
HTH to get you started with trouble shooting. Let us know what you find.
-Matthias
On 11/7/25 8:51 PM, Steven Schlansker wrote:
Happy Friday everyone,
We run a moderately large Kafka 4.1.0 Streams topology. Over time we've started to
receive bug reports from users about seemingly "impossible" aggregate values.
After investigation, we hypothesize this is due to stores (particularly
aggregates) double-counting some messages
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)
So, we're starting to experiment with exactly-once semantics. I try to launch a
single node Streams app
StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = search-indexing-prod-scs-tmp-145
application.server = localhost:8080
bootstrap.servers = [kafka-broker-0.kafka-broker.default:9093,
kafka-broker-1.kafka-broker.default:9093,
kafka-broker-2.kafka-broker.default:9093]
buffered.records.per.partition = 500
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 100
connections.max.idle.ms = 540000
default.client.supplier = class
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = null
deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
dsl.store.suppliers.class = class
org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers
enable.metrics.push = true
ensure.explicit.internal.resource.naming = true
group.protocol = classic
log.summary.interval.ms = 120000
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metadata.recovery.rebootstrap.trigger.ms = 300000
metadata.recovery.strategy = rebootstrap
metric.reporters = [org.apache.kafka.common.metrics.JmxReporter]
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 8
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler
processing.guarantee = exactly_once_v2
processor.wrapper.class = class
com.paywholesail.service.search.core.streams.NamedFrameProcessorWrapper
production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
rack.aware.assignment.non_overlap_cost = null
rack.aware.assignment.strategy = balance_subtopology
rack.aware.assignment.tags = [zone]
rack.aware.assignment.traffic_cost = null
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
repartition.purge.interval.ms = 30000
replication.factor = 2
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = class
com.paywholesail.service.search.core.streams.SearchRocksDBConfigSetter
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /volumes/search-index/40-streams-data
statestore.cache.max.bytes = 21474836480
task.assignor.class = null
task.timeout.ms = 300000
topology.optimization = all
upgrade.from = null
window.size.ms = null
windowed.inner.class.serde = null
windowstore.changelog.additional.retention.ms = 86400000
Almost immediately, the application starts spamming logs like:
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-5-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-5]
Got error produce response with correlation id 12 on topic-partition
search-indexing-prod-scs-tmp-145-current-date-per-tz-eoc-table-changelog-28,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-13,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-2,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-6,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-user-access-group-by-company-repartition-13,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-7-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-7]
Got error produce response with correlation id 10 on topic-partition
search-indexing-prod-scs-tmp-145-payment-account-share-materialized-changelog-14,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
(continues about 60,000 times)
After about 20 minutes of this, it gives up:
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to
commit a transaction
at
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:262)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:184)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:152)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1957)
at
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1924)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1782)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1276)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:926)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:886)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
producer attempted a transactional operation in an invalid state.
Around the same time, the 3 brokers (also 4.1.0) starts logging lots of:
ERROR [ReplicaManager broker=102] Error processing append operation on
partition
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-3
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence
number for producer 370010 at offset 2827 in partition
search-indexing-prod-scs-tmp-145-notification-repartition-29: 1 (incoming seq.
number). Expected sequence 0 for transactions v2 idempotent producer with no
existing state
[2025-11-08 00:06:56,628] ERROR [ReplicaManager broker=102] Error processing
append operation on partition
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-17
(kafka.server.ReplicaManager).
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence
number for producer 370012 at offset 2742 in partition
search-indexing-prod-scs-tmp-145-notification-repartition-17: 1 (incoming seq.
number). Expected sequence 0 for transactions v2 idempotent producer with no
existing state.
[2025-11-08 00:06:57,750] ERROR [ReplicaManager broker=100] Error processing
append operation on partition
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-20
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order
sequence number for producer 368019 at offset 3196 in partition
search-indexing-prod-scs-tmp-145-notification-repartition-20: 2 (incoming seq.
number), 0 (current end sequence number)
This is our first experience with Streams EOSv2 and Kafka transactions in
general, so I hope I didn't configure something horribly badly, but I am not
sure where we've gone wrong.
If anyone can offer any advice that'd be much appreciated.
Thanks,
Steven