[ 
https://issues.apache.org/jira/browse/KAFKA-18673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danil Shkodin updated KAFKA-18673:
----------------------------------
    Description: 
Consider adding some method of preventing a transactional _Producer_ instance 
from expiring, please.

Currently, for ones that run services 24/7 that write transactional messages to 
Kafka very sparsely there are several options to keep the program highly 
available.

The first being what Spring  
[does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
 rotating transactional producers at intervals lower than the expiration 
timeout.
{code:java}
void fixTransactionalIdExpiration() {
  try {
    producer.close(timeout);
  } catch (Exception error) {
    logger.warn("...", error);
  }
  producer = null;
  try {
    producer = new KafkaProducer<>(settings);
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
    return;
  }
  try {
    producer.initTransactions();
  } catch (Exception error) {
    logger.warn("...", error);
    // close producer and clean up, handle failure
    return;
  }
}{code}
The other similar one is to also act periodically, but to just write an empty 
record transactionally instead of reconnecting.
{code:java}
void fixTransactionalIdExpiration() {
  try {
    producer.beginTransaction();
    var topic = "project_prefix.__dummy_topic";
    var message = new ProducerRecord<>(topic, (String) null, (String) null);
    producer.send(message);
    producer.abortTransaction();
    // or producer.commitTransaction(); does not matter
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
  }
}{code}
Personally, I do not like the necessity of introducing a service topic. This 
inelegance overweights reconnection troubles for me.
Suprisingly, producing an empty transaction does not prevent expiration. 
[Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
 there is a guard in the transactional manager that would prevent actual 
updates to the transactinal producer mappings if there is nothing to write.
{code:java}
void fixTransactionalIdExpiration() {
  // does not work
  // producer will still get fenced upon transactional.id.expiration.ms
  try {
    producer.beginTransaction();
    producer.commitTransaction();
  } catch (Exception error) {}
} {code}
Worth noting that client code may execute one of these periodic fixes 
conditionally, only if there was no activity, meaning there were no successful 
_send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.

The last and obvious one is to let it fail and react to the error.
{code:java}
void sendMessage(Message message) {
  try {
    producer.beginTransaction();
    producer.send(message.to());
    producer.commitTransaction();
  } catch (InvalidPidMappingException error) {
    // reconnect, retry
  } catch (Exception error) {
    // handle failure
  }
}{code}
Having a dedicated method that explicitly reflects the intent to refresh 
producer _transactional.id_ would line up with the _Consumer_ polling mechanism 
and manifest to new kafka-clients users that lasting transactional _Producer_ 
inactivity should be addressed.
{code:java}
void fixTransactionalIdExpiration() {
  try {
    producer.refreshTransactions();
  } catch (Exception error) {}
}{code}
This issue search optimization:
InvalidPidMappingException: The producer attempted to use a producer id which 
is not currently assigned to its transactional id
transactional.id.expiration.ms

 

  was:
Consider adding some method of preventing a transactional _Producer_ instance 
from expiring, please.

Currently, for ones that run services 24/7 that write transactional messages to 
Kafka very sparsely there are several options to keep the program highly 
available.

The first being what Spring  
[does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
 rotating transactional producers at intervals lower than the expiration 
timeout.
{code:java}
void fixTransactionalIdExpiration() {
  try {
    producer.close(timeout);
  } catch (Exception error) {
    logger.warn("...", error);
  }
  producer = null;
  try {
    producer = new KafkaProducer<>(settings);
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
    return;
  }
  try {
    producer.initTransactions();
  } catch (Exception error) {
    logger.warn("...", error);
    // close producer and clean up, handle failure
    return;
  }
}{code}
The other similar one is to also act periodically, but to just write an empty 
record transactionally instead of reconnecting.
{code:java}
void fixTransactionalIdExpiration() {
  try {
    producer.beginTransaction();
    var topic = "project_prefix.__dummy_topic";
    var message = new ProducerRecord<>(topic, (String) null, (String) null);
    producer.send(message);
    producer.abortTransaction();
    // or producer.commitTransaction(); does not matter
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
  }
}{code}
Personally, I do not like the necessity of introducing a service topic. This 
inelegance overweights reconnection troubles for me.
Suprisingly, producing an empty transaction does not prevent expiration. 
[Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
 there is a guard in the transactional manager that would prevent actual 
updates to the transactinal producer mappings if there is nothing to write.
{code:java}
void fixTransactionalIdExpiration() {
  // does not work
  // producer will still get fenced upon transactional.id.expiration.ms
  try {
    producer.beginTransaction();
    producer.commitTransaction();
  } catch (Exception error) {}
} {code}
Worth noting that client code may execute one of these periodic fixes 
conditionally, only if there was no activity, meaning there were no successful 
_send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.

The last and obvious one is to let it fail and react to the error.
{code:java}
void sendMessage(Message message) {
  try {
    producer.beginTransaction();
    producer.send(message.to());
    producer.commitTransaction();
  } catch (InvalidPidMappingException error) {
    // reconnect, retry
  } catch (Exception error) {
    // handle failure
  }
}{code}
Having a dedicated method that explicitly reflects the intent to refresh 
producer _transactional.id_ would line up with the _Consumer_ polling mechanism 
and manifest to new kafka-clients users that lasting transactional _Producer_ 
inactivity should be addressed.

This issue search optimization:
InvalidPidMappingException: The producer attempted to use a producer id which 
is not currently assigned to its transactional id
transactional.id.expiration.ms

 


> Provide means to gracefully update Producer transational id mapping state in 
> case of lasting inactivity
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-18673
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18673
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Danil Shkodin
>            Assignee: Alex Tran
>            Priority: Major
>
> Consider adding some method of preventing a transactional _Producer_ instance 
> from expiring, please.
> Currently, for ones that run services 24/7 that write transactional messages 
> to Kafka very sparsely there are several options to keep the program highly 
> available.
> The first being what Spring  
> [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
>  rotating transactional producers at intervals lower than the expiration 
> timeout.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.close(timeout);
>   } catch (Exception error) {
>     logger.warn("...", error);
>   }
>   producer = null;
>   try {
>     producer = new KafkaProducer<>(settings);
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // handle failure
>     return;
>   }
>   try {
>     producer.initTransactions();
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // close producer and clean up, handle failure
>     return;
>   }
> }{code}
> The other similar one is to also act periodically, but to just write an empty 
> record transactionally instead of reconnecting.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.beginTransaction();
>     var topic = "project_prefix.__dummy_topic";
>     var message = new ProducerRecord<>(topic, (String) null, (String) null);
>     producer.send(message);
>     producer.abortTransaction();
>     // or producer.commitTransaction(); does not matter
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // handle failure
>   }
> }{code}
> Personally, I do not like the necessity of introducing a service topic. This 
> inelegance overweights reconnection troubles for me.
> Suprisingly, producing an empty transaction does not prevent expiration. 
> [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
>  there is a guard in the transactional manager that would prevent actual 
> updates to the transactinal producer mappings if there is nothing to write.
> {code:java}
> void fixTransactionalIdExpiration() {
>   // does not work
>   // producer will still get fenced upon transactional.id.expiration.ms
>   try {
>     producer.beginTransaction();
>     producer.commitTransaction();
>   } catch (Exception error) {}
> } {code}
> Worth noting that client code may execute one of these periodic fixes 
> conditionally, only if there was no activity, meaning there were no 
> successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.
> The last and obvious one is to let it fail and react to the error.
> {code:java}
> void sendMessage(Message message) {
>   try {
>     producer.beginTransaction();
>     producer.send(message.to());
>     producer.commitTransaction();
>   } catch (InvalidPidMappingException error) {
>     // reconnect, retry
>   } catch (Exception error) {
>     // handle failure
>   }
> }{code}
> Having a dedicated method that explicitly reflects the intent to refresh 
> producer _transactional.id_ would line up with the _Consumer_ polling 
> mechanism and manifest to new kafka-clients users that lasting transactional 
> _Producer_ inactivity should be addressed.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.refreshTransactions();
>   } catch (Exception error) {}
> }{code}
> This issue search optimization:
> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> transactional.id.expiration.ms
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to