Hello All.
I'm trying to setup the KAFKA cluster with transactions and
authorization enabled.
I use "spring-kafka" integration. Here is my code in Java:
@Bean
public DefaultKafkaProducerFactory createDefaultKafkaProducerFactory(
KafkaProperties kafkaProperties,
@Value("${my.kafka.producer.transaction-prefix}") String
transactionIdPrefix) {
Map<String, Object> producerProperties =
kafkaProperties.buildProducerProperties();
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
1);
producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3);
producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");
producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionIdPrefix);
DefaultKafkaProducerFactory producerFactory = new
DefaultKafkaProducerFactory(producerProperties);
producerFactory.setTransactionIdPrefix(transactionIdPrefix);
return producerFactory;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager(
DefaultKafkaProducerFactory producerFactory) {
KafkaTransactionManager ktm = new
KafkaTransactionManager(producerFactory);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
All works perfectly, when my KAFKA server do not have authorization
enabled. But if I enable SASL_SSL plain authorization, then I get error
at transaction initialization step.
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="${my.kafka.user}" password="${my.kafka.password}";
Caused by:
org.springframework.transaction.CannotCreateTransactionException: Could
not create Kafka transaction;
nested exception is
org.apache.kafka.common.errors.TransactionalIdAuthorizationException:
Transactional Id authorization failed
at
org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:140)
at
org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:377)
at
org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:68)
at
org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:101)
... 96 common frames omitted
Caused by:
org.apache.kafka.common.errors.TransactionalIdAuthorizationException:
Transactional Id authorization failed
As I understand, I must setup "TransactionIdPrefix" in Producer to
authorize my transaction on server.
But I cannot find any information about how to setup the same property
on server side. For now server don't know anything about my value in
"TransactionIdPrefix" and this the reason for this error.
Could you tell me, what should I do to avoid this error? Which parameter
where I should set *on **server side*?
Vadim.