[ https://issues.apache.org/jira/browse/KAFKA-18673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danil Shkodin updated KAFKA-18673: ---------------------------------- Description: Consider adding some method of preventing a transactional _Producer_ instance from expiring, please. Currently, for ones that run services 24/7 that write transactional messages to Kafka very sparsely there are several options to keep the program highly available. The first being what Spring [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]: rotating transactional producers at intervals lower than the expiration timeout. {code:java} void fixTransactionalIdExpiration() { try { producer.close(timeout); } catch (Exception error) { logger.warn("...", error); } producer = null; try { producer = new KafkaProducer<>(settings); } catch (Exception error) { logger.warn("...", error); // handle failure return; } try { producer.initTransactions(); } catch (Exception error) { logger.warn("...", error); // close producer and clean up, handle failure return; } }{code} The other similar one is to also act periodically, but to just write an empty record transactionally instead of reconnecting. {code:java} void fixTransactionalIdExpiration() { try { producer.beginTransaction(); var topic = "project_prefix.__dummy_topic"; var message = new ProducerRecord<>(topic, (String) null, (String) null); producer.send(message); producer.abortTransaction(); // or producer.commitTransaction(); does not matter } catch (Exception error) { logger.warn("...", error); // handle failure } }{code} Personally, I do not like the necessity of introducing a service topic. This inelegance overweights reconnection troubles for me. Suprisingly, producing an empty transaction does not prevent expiration. [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840], there is a guard in the transactional manager that would prevent actual updates to the transactinal producer mappings if there is nothing to write. {code:java} void fixTransactionalIdExpiration() { // does not work // producer will still get fenced upon transactional.id.expiration.ms try { producer.beginTransaction(); producer.commitTransaction(); } catch (Exception error) {} } {code} Worth noting that client code may execute one of these periodic fixes conditionally, only if there was no activity, meaning there were no successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours. The last and obvious one is to let it fail and react to the error. {code:java} void sendMessage(Message message) { try { producer.beginTransaction(); producer.send(message.to()); producer.commitTransaction(); } catch (InvalidPidMappingException error) { // reconnect, retry } catch (Exception error) { // handle failure } }{code} Having a dedicated method that explicitly reflects the intent to refresh producer _transactional.id_ would line up with the _Consumer_ polling mechanism and manifest to new kafka-clients users that lasting transactional _Producer_ inactivity should be addressed. {code:java} void fixTransactionalIdExpiration() { try { producer.refreshTransactions(); } catch (Exception error) {} }{code} This issue search optimization: InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id transactional.id.expiration.ms was: Consider adding some method of preventing a transactional _Producer_ instance from expiring, please. Currently, for ones that run services 24/7 that write transactional messages to Kafka very sparsely there are several options to keep the program highly available. The first being what Spring [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]: rotating transactional producers at intervals lower than the expiration timeout. {code:java} void fixTransactionalIdExpiration() { try { producer.close(timeout); } catch (Exception error) { logger.warn("...", error); } producer = null; try { producer = new KafkaProducer<>(settings); } catch (Exception error) { logger.warn("...", error); // handle failure return; } try { producer.initTransactions(); } catch (Exception error) { logger.warn("...", error); // close producer and clean up, handle failure return; } }{code} The other similar one is to also act periodically, but to just write an empty record transactionally instead of reconnecting. {code:java} void fixTransactionalIdExpiration() { try { producer.beginTransaction(); var topic = "project_prefix.__dummy_topic"; var message = new ProducerRecord<>(topic, (String) null, (String) null); producer.send(message); producer.abortTransaction(); // or producer.commitTransaction(); does not matter } catch (Exception error) { logger.warn("...", error); // handle failure } }{code} Personally, I do not like the necessity of introducing a service topic. This inelegance overweights reconnection troubles for me. Suprisingly, producing an empty transaction does not prevent expiration. [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840], there is a guard in the transactional manager that would prevent actual updates to the transactinal producer mappings if there is nothing to write. {code:java} void fixTransactionalIdExpiration() { // does not work // producer will still get fenced upon transactional.id.expiration.ms try { producer.beginTransaction(); producer.commitTransaction(); } catch (Exception error) {} } {code} Worth noting that client code may execute one of these periodic fixes conditionally, only if there was no activity, meaning there were no successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours. The last and obvious one is to let it fail and react to the error. {code:java} void sendMessage(Message message) { try { producer.beginTransaction(); producer.send(message.to()); producer.commitTransaction(); } catch (InvalidPidMappingException error) { // reconnect, retry } catch (Exception error) { // handle failure } }{code} Having a dedicated method that explicitly reflects the intent to refresh producer _transactional.id_ would line up with the _Consumer_ polling mechanism and manifest to new kafka-clients users that lasting transactional _Producer_ inactivity should be addressed. This issue search optimization: InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id transactional.id.expiration.ms > Provide means to gracefully update Producer transational id mapping state in > case of lasting inactivity > ------------------------------------------------------------------------------------------------------- > > Key: KAFKA-18673 > URL: https://issues.apache.org/jira/browse/KAFKA-18673 > Project: Kafka > Issue Type: Improvement > Reporter: Danil Shkodin > Assignee: Alex Tran > Priority: Major > > Consider adding some method of preventing a transactional _Producer_ instance > from expiring, please. > Currently, for ones that run services 24/7 that write transactional messages > to Kafka very sparsely there are several options to keep the program highly > available. > The first being what Spring > [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]: > rotating transactional producers at intervals lower than the expiration > timeout. > {code:java} > void fixTransactionalIdExpiration() { > try { > producer.close(timeout); > } catch (Exception error) { > logger.warn("...", error); > } > producer = null; > try { > producer = new KafkaProducer<>(settings); > } catch (Exception error) { > logger.warn("...", error); > // handle failure > return; > } > try { > producer.initTransactions(); > } catch (Exception error) { > logger.warn("...", error); > // close producer and clean up, handle failure > return; > } > }{code} > The other similar one is to also act periodically, but to just write an empty > record transactionally instead of reconnecting. > {code:java} > void fixTransactionalIdExpiration() { > try { > producer.beginTransaction(); > var topic = "project_prefix.__dummy_topic"; > var message = new ProducerRecord<>(topic, (String) null, (String) null); > producer.send(message); > producer.abortTransaction(); > // or producer.commitTransaction(); does not matter > } catch (Exception error) { > logger.warn("...", error); > // handle failure > } > }{code} > Personally, I do not like the necessity of introducing a service topic. This > inelegance overweights reconnection troubles for me. > Suprisingly, producing an empty transaction does not prevent expiration. > [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840], > there is a guard in the transactional manager that would prevent actual > updates to the transactinal producer mappings if there is nothing to write. > {code:java} > void fixTransactionalIdExpiration() { > // does not work > // producer will still get fenced upon transactional.id.expiration.ms > try { > producer.beginTransaction(); > producer.commitTransaction(); > } catch (Exception error) {} > } {code} > Worth noting that client code may execute one of these periodic fixes > conditionally, only if there was no activity, meaning there were no > successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours. > The last and obvious one is to let it fail and react to the error. > {code:java} > void sendMessage(Message message) { > try { > producer.beginTransaction(); > producer.send(message.to()); > producer.commitTransaction(); > } catch (InvalidPidMappingException error) { > // reconnect, retry > } catch (Exception error) { > // handle failure > } > }{code} > Having a dedicated method that explicitly reflects the intent to refresh > producer _transactional.id_ would line up with the _Consumer_ polling > mechanism and manifest to new kafka-clients users that lasting transactional > _Producer_ inactivity should be addressed. > {code:java} > void fixTransactionalIdExpiration() { > try { > producer.refreshTransactions(); > } catch (Exception error) {} > }{code} > This issue search optimization: > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > transactional.id.expiration.ms > -- This message was sent by Atlassian Jira (v8.20.10#820010)