Hello All.

I'm trying to setup the KAFKA cluster with transactions and authorization enabled.
I use "spring-kafka" integration. Here is my code in Java:

    @Bean
    public DefaultKafkaProducerFactory createDefaultKafkaProducerFactory(
            KafkaProperties kafkaProperties,
            @Value("${my.kafka.producer.transaction-prefix}") String transactionIdPrefix) {         Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3);
        producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");
producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);         DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerProperties);
        producerFactory.setTransactionIdPrefix(transactionIdPrefix);
        return producerFactory;
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager(
            DefaultKafkaProducerFactory producerFactory) {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

All works perfectly, when my KAFKA server do not have authorization enabled. But if I enable SASL_SSL plain authorization, then I get error at transaction initialization step.

spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${my.kafka.user}" password="${my.kafka.password}";

Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed     at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:140)     at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:377)     at org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:68)     at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:101)
    ... 96 common frames omitted
Caused by: org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed


As I understand, I must setup "TransactionIdPrefix" in Producer to authorize my transaction on server. But I cannot find any information about how to setup the same property on server side. For now server don't know anything about my value in "TransactionIdPrefix" and this the reason for this error.

Could you tell me, what should I do to avoid this error? Which parameter where I should set *on **server side*?


Vadim.

Reply via email to