
Danil Shkodin updated KAFKA-18673:
Consider adding some method of preventing a transactional _Producer_ instance 
from expiring, please.

Currently, for ones that run services 24/7 that write transactional messages to 
Kafka very sparsely there are several options to keep the program highly 

The first being what Spring  
 rotating transactional producers at intervals lower than the expiration 
void fixTransactionalIdExpiration() {
  try {
  } catch (Exception error) {
    logger.warn("...", error);
  producer = null;
  try {
    producer = new KafkaProducer<>(settings);
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
  try {
  } catch (Exception error) {
    logger.warn("...", error);
    // close producer and clean up, handle failure
The other similar one is to also act periodically, but to just write an empty 
record transactionally instead of reconnecting.
void fixTransactionalIdExpiration() {
  try {
    var topic = "project_prefix.__dummy_topic";
    var message = new ProducerRecord<>(topic, (String) null, (String) null);
    // or producer.commitTransaction(); does not matter
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
Personally, I do not like the necessity of introducing a service topic. This 
inelegance overweights reconnection troubles for me.
Suprisingly, producing an empty transaction does not prevent expiration. 
 there is a guard in the transactional manager that would prevent actual 
updates to the transactinal producer mappings if there is nothing to write.
void fixTransactionalIdExpiration() {
  // does not work
  // producer will still get fenced upon transactional.id.expiration.ms
  try {
  } catch (Exception error) {}
} {code}
Worth noting that client code may execute one of these periodic fixes 
conditionally, only if there was no activity, meaning there were no successful 
_send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.

The last and obvious one is to let it fail and react to the error.
void sendMessage(Message message) {
  try {
  } catch (InvalidPidMappingException error) {
    // reconnect, retry
  } catch (Exception error) {
    // handle failure
Having a dedicated method that explicitly reflects the intent to refresh 
producer _transactional.id_ would line up with the _Consumer_ polling mechanism 
and manifest to new kafka-clients users that lasting transactional _Producer_ 
inactivity should be addressed.
void fixTransactionalIdExpiration() {
  try {
  } catch (Exception error) {}
This issue search optimization:
InvalidPidMappingException: The producer attempted to use a producer id which 
is not currently assigned to its transactional id


Consider adding some method of preventing a transactional _Producer_ instance 
from expiring, please.

Currently, for ones that run services 24/7 that write transactional messages to 
Kafka very sparsely there are several options to keep the program highly 

The first being what Spring  
 rotating transactional producers at intervals lower than the expiration 
void fixTransactionalIdExpiration() {
  try {
  } catch (Exception error) {
    logger.warn("...", error);
  producer = null;
  try {
    producer = new KafkaProducer<>(settings);
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
  try {
  } catch (Exception error) {
    logger.warn("...", error);
    // close producer and clean up, handle failure
The other similar one is to also act periodically, but to just write an empty 
record transactionally instead of reconnecting.
void fixTransactionalIdExpiration() {
  try {
    var topic = "project_prefix.__dummy_topic";
    var message = new ProducerRecord<>(topic, (String) null, (String) null);
    // or producer.commitTransaction(); does not matter
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
Personally, I do not like the necessity of introducing a service topic. This 
inelegance overweights reconnection troubles for me.
Suprisingly, producing an empty transaction does not prevent expiration. 
 there is a guard in the transactional manager that would prevent actual 
updates to the transactinal producer mappings if there is nothing to write.
void fixTransactionalIdExpiration() {
  // does not work
  // producer will still get fenced upon transactional.id.expiration.ms
  try {
  } catch (Exception error) {}
} {code}
Worth noting that client code may execute one of these periodic fixes 
conditionally, only if there was no activity, meaning there were no successful 
_send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.

The last and obvious one is to let it fail and react to the error.
void sendMessage(Message message) {
  try {
  } catch (InvalidPidMappingException error) {
    // reconnect, retry
  } catch (Exception error) {
    // handle failure
Having a dedicated method that explicitly reflects the intent to refresh 
producer _transactional.id_ would line up with the _Consumer_ polling mechanism 
and manifest to new kafka-clients users that lasting transactional _Producer_ 
inactivity should be addressed.

This issue search optimization:
InvalidPidMappingException: The producer attempted to use a producer id which 
is not currently assigned to its transactional id


> Provide means to gracefully update Producer transational id mapping state in 
> case of lasting inactivity
> -------------------------------------------------------------------------------------------------------
>                 Key: KAFKA-18673
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18673
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Danil Shkodin
>            Assignee: Alex Tran
>            Priority: Major
> Consider adding some method of preventing a transactional _Producer_ instance 
> from expiring, please.
> Currently, for ones that run services 24/7 that write transactional messages 
> to Kafka very sparsely there are several options to keep the program highly 
> available.
> The first being what Spring  
> [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
>  rotating transactional producers at intervals lower than the expiration 
> timeout.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.close(timeout);
>   } catch (Exception error) {
>     logger.warn("...", error);
>   }
>   producer = null;
>   try {
>     producer = new KafkaProducer<>(settings);
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // handle failure
>     return;
>   }
>   try {
>     producer.initTransactions();
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // close producer and clean up, handle failure
>     return;
>   }
> }{code}
> The other similar one is to also act periodically, but to just write an empty 
> record transactionally instead of reconnecting.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.beginTransaction();
>     var topic = "project_prefix.__dummy_topic";
>     var message = new ProducerRecord<>(topic, (String) null, (String) null);
>     producer.send(message);
>     producer.abortTransaction();
>     // or producer.commitTransaction(); does not matter
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // handle failure
>   }
> }{code}
> Personally, I do not like the necessity of introducing a service topic. This 
> inelegance overweights reconnection troubles for me.
> Suprisingly, producing an empty transaction does not prevent expiration. 
> [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
>  there is a guard in the transactional manager that would prevent actual 
> updates to the transactinal producer mappings if there is nothing to write.
> {code:java}
> void fixTransactionalIdExpiration() {
>   // does not work
>   // producer will still get fenced upon transactional.id.expiration.ms
>   try {
>     producer.beginTransaction();
>     producer.commitTransaction();
>   } catch (Exception error) {}
> } {code}
> Worth noting that client code may execute one of these periodic fixes 
> conditionally, only if there was no activity, meaning there were no 
> successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.
> The last and obvious one is to let it fail and react to the error.
> {code:java}
> void sendMessage(Message message) {
>   try {
>     producer.beginTransaction();
>     producer.send(message.to());
>     producer.commitTransaction();
>   } catch (InvalidPidMappingException error) {
>     // reconnect, retry
>   } catch (Exception error) {
>     // handle failure
>   }
> }{code}
> Having a dedicated method that explicitly reflects the intent to refresh 
> producer _transactional.id_ would line up with the _Consumer_ polling 
> mechanism and manifest to new kafka-clients users that lasting transactional 
> _Producer_ inactivity should be addressed.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.refreshTransactions();
>   } catch (Exception error) {}
> }{code}
> This issue search optimization:
> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> transactional.id.expiration.ms

This message was sent by Atlassian Jira

Reply via email to