Hi,

I'm currently trying to debug an error that happens in some of our services 
that I do not understand. To give some context:

- We are using compacted topics for communication between multiple 
microservices with one topic per business entity.
- The services are spring boot based and we are using the 
spring-cloud-stream-kafka-binder.
- Since we want to have exactly once semantic and also need to coordinate the 
kafka transactions with database transactions we are using the 
"ChainedTransactionManager". This should (in theory, take care of all the messy 
transaction handling). 

Here is an excerpt of the logs when the error happens, I will comment it a bit 
to make it easier to understand (logs lines are intended, my comments are on 
the top level.

    2019-10-10T08:59:19.945+00:00  DEBUG 
customer.remoteuser.event.UserEventAdapter : Received event user.updated for 
subject 9b3388e7-db34-47c6-b6ae-370e1899baef
    
 The service received a new event. To my understanding it will write the read 
offset to __consumer_offsets. Since we configured it transactional it will now
 create a transactional producer so the write to __consumer_offsets will be in 
the same transaction as any other messages the might be written out (i.e. to 
other topics).
    
    2019-10-10T08:59:20.104+00:00  INFO 
org.apache.kafka.clients.producer.ProducerConfig : ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [hidden]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = true
        interceptor.classes = []
        key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = SSL
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = [hidden]
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = SSL
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = [hidden]
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = c-customerservice-0c-customerservice.dev-users-v1.0
        value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
    
I noticed that the transaction.timeout is a full minute. Other than that it 
does not look special to me.
    
    2019-10-10T08:59:20.106+00:00  INFO 
org.apache.kafka.clients.producer.KafkaProducer : [Producer 
clientId=producer-31, 
transactionalId=c-customerservice-0c-customerservice.dev-users-v1.0] 
Instantiated a transactional producer.
    2019-10-10T08:59:20.106+00:00  INFO 
org.apache.kafka.clients.producer.KafkaProducer : [Producer 
clientId=producer-31, 
transactionalId=c-customerservice-0c-customerservice.dev-users-v1.0] Overriding 
the default retries config to the recommended value of 2147483647 since the 
idempotent producer is enabled.
    2019-10-10T08:59:20.107+00:00  INFO 
org.apache.kafka.clients.producer.KafkaProducer : [Producer 
clientId=producer-31, 
transactionalId=c-customerservice-0c-customerservice.dev-users-v1.0] Overriding 
the default acks to all since idempotence is enabled.
    2019-10-10T08:59:20.842+00:00  INFO 
org.apache.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
    2019-10-10T08:59:20.842+00:00  INFO 
org.apache.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
    2019-10-10T08:59:20.843+00:00  INFO 
org.apache.kafka.clients.producer.internals.TransactionManager : [Producer 
clientId=producer-31, 
transactionalId=c-customerservice-0c-customerservice.dev-users-v1.0] ProducerId 
set to -1 with epoch -1
    2019-10-10T08:59:21.115+00:00  INFO 
org.apache.kafka.clients.producer.internals.TransactionManager : [Producer 
clientId=producer-31, 
transactionalId=c-customerservice-0c-customerservice.dev-users-v1.0] ProducerId 
set to 17778 with epoch 60
    
This is interesting to me, why the double output? I *suspect* the producer is 
created "empty" and then actually communicates with the broker and received a 
proper ProducerId and gets assigned an epoch. Is this assumption correct?    
    
    2019-10-10T08:59:21.146+00:00  DEBUG customer.CustomerService : 
createOrUpdate updated existing customer 9b3388e7-db34-47c6-b6ae-370e1899baef
    2019-10-10T08:59:21.315+00:00  ERROR 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer
 : Send offsets to transaction failed
        org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.
    
And this it where it ends (being Java there are more exception but this is 
always the root cause) and I do not understand whats happening. There are to 
possible options listed:

A) "there is a newer producer with the same transactionalId" - I am certain 
that there is not. There is exactly one instance of the service running and the 
transaction id contains the service name. Since spring kafka also creates a 
transactionId per group->topic->partition there really should not be any other 
producer with this combination of attributes.
B) "transaction has been expired by the broker" - Well, the transaction timeout 
is 60000ms and this error happens only 1500ms after the producer creation. So 
this option is also out of the window.

Is there any other explanation possible? Do you need more information?

Regards
Martin



Reply via email to