Happy Friday everyone,
We run a moderately large Kafka 4.1.0 Streams topology. Over time we've started
to receive bug reports from users about seemingly "impossible" aggregate values.
After investigation, we hypothesize this is due to stores (particularly
aggregates) double-counting some messages
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores)
So, we're starting to experiment with exactly-once semantics. I try to launch a
single node Streams app
StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = search-indexing-prod-scs-tmp-145
application.server = localhost:8080
bootstrap.servers = [kafka-broker-0.kafka-broker.default:9093,
kafka-broker-1.kafka-broker.default:9093,
kafka-broker-2.kafka-broker.default:9093]
buffered.records.per.partition = 500
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 100
connections.max.idle.ms = 540000
default.client.supplier = class
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = null
deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
dsl.store.suppliers.class = class
org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers
enable.metrics.push = true
ensure.explicit.internal.resource.naming = true
group.protocol = classic
log.summary.interval.ms = 120000
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metadata.recovery.rebootstrap.trigger.ms = 300000
metadata.recovery.strategy = rebootstrap
metric.reporters = [org.apache.kafka.common.metrics.JmxReporter]
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 8
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler
processing.guarantee = exactly_once_v2
processor.wrapper.class = class
com.paywholesail.service.search.core.streams.NamedFrameProcessorWrapper
production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
rack.aware.assignment.non_overlap_cost = null
rack.aware.assignment.strategy = balance_subtopology
rack.aware.assignment.tags = [zone]
rack.aware.assignment.traffic_cost = null
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
repartition.purge.interval.ms = 30000
replication.factor = 2
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = class
com.paywholesail.service.search.core.streams.SearchRocksDBConfigSetter
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /volumes/search-index/40-streams-data
statestore.cache.max.bytes = 21474836480
task.assignor.class = null
task.timeout.ms = 300000
topology.optimization = all
upgrade.from = null
window.size.ms = null
windowed.inner.class.serde = null
windowstore.changelog.additional.retention.ms = 86400000
Almost immediately, the application starts spamming logs like:
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-5-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-5]
Got error produce response with correlation id 12 on topic-partition
search-indexing-prod-scs-tmp-145-current-date-per-tz-eoc-table-changelog-28,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-13,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-2,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-indexed-tag-group-by-company-repartition-6,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-8-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-8]
Got error produce response with correlation id 15 on topic-partition
search-indexing-prod-scs-tmp-145-user-access-group-by-company-repartition-13,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
[Producer
clientId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-StreamThread-7-producer,
transactionalId=search-indexing-prod-scs-tmp-145-e6fe9936-e45d-4838-8f5b-6e7d8b19c990-7]
Got error produce response with correlation id 10 on topic-partition
search-indexing-prod-scs-tmp-145-payment-account-share-materialized-changelog-14,
retrying (2147483646 attempts left). Error: CONCURRENT_TRANSACTIONS
(continues about 60,000 times)
After about 20 minutes of this, it gives up:
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to
commit a transaction
at
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:262)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:184)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:152)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1957)
at
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1924)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1782)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1276)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:926)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:886)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
producer attempted a transactional operation in an invalid state.
Around the same time, the 3 brokers (also 4.1.0) starts logging lots of:
ERROR [ReplicaManager broker=102] Error processing append operation on
partition
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-3
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence
number for producer 370010 at offset 2827 in partition
search-indexing-prod-scs-tmp-145-notification-repartition-29: 1 (incoming seq.
number). Expected sequence 0 for transactions v2 idempotent producer with no
existing state
[2025-11-08 00:06:56,628] ERROR [ReplicaManager broker=102] Error processing
append operation on partition
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-17
(kafka.server.ReplicaManager).
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence
number for producer 370012 at offset 2742 in partition
search-indexing-prod-scs-tmp-145-notification-repartition-17: 1 (incoming seq.
number). Expected sequence 0 for transactions v2 idempotent producer with no
existing state.
[2025-11-08 00:06:57,750] ERROR [ReplicaManager broker=100] Error processing
append operation on partition
x4ERcnG9RqmG2vdJvboRKw:search-indexing-prod-scs-tmp-145-notification-repartition-20
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order
sequence number for producer 368019 at offset 3196 in partition
search-indexing-prod-scs-tmp-145-notification-repartition-20: 2 (incoming seq.
number), 0 (current end sequence number)
This is our first experience with Streams EOSv2 and Kafka transactions in
general, so I hope I didn't configure something horribly badly, but I am not
sure where we've gone wrong.
If anyone can offer any advice that'd be much appreciated.
Thanks,
Steven