Good morning Marco,
Your fix is pretty plausible: * Kafka transactions get started at the beginning of a checkpoint period and contain all events collected through this period, * At the end of the checkpoint period the associated transaction is committed and concurrently the transaction of the next checkpoint period is started * In your case (checkpoint period + minimum distance) always last at least 1 minute and hence the transaction timeouts Kafka transactions work a little different to traditional RDBMS transactions: * They are basically a pointer offset in each kafka partition that marks a range of pending events to be committed * A kafka reader can either read-uncommitted, and sees these uncommitted events immediately or * If in read-committed mode: needs to wait for the committed record offset (per partition) to advance * If transactions don’t commit, such reader effectively gets halted * Kafka transaction timeout is a means to prevent consumers to get blocked for too long if one of the producers fails to commit (e.g. crashed) Your fix to increase kafka transaction timeout is sound in this respect. Documentation on the kafka page is very detailed … Open questions? … get back to the community 😊 Cheers Thias From: Marco Villalobos <mvillalo...@kineteque.com> Sent: Mittwoch, 23. Februar 2022 19:11 To: Nicolaus Weidner <nicolaus.weid...@ververica.com> Cc: user <user@flink.apache.org> Subject: Re: Trouble sinking to Kafka I fixed this, but I'm not 100% sure why. Here is my theory: My checkpoint interval is one minute, and the minimum pause interval is also one minute. My transaction timeout time is also one minute. I think the checkpoint causes Flink to hold the transaction open for one minute, and thus it times out. After I changed the transaction.max.timeout.ms<http://transaction.max.timeout.ms> to one hour, and the transaction.timeout.ms<http://transaction.timeout.ms> to five minutes, it all worked like a charm. Is my theory correct? The documentation kind of suggestion this is the cause: https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html However, I think the documentation could benefit with a few examples and scenarios that can ill-considered configurations. Thank you. On Wed, Feb 23, 2022 at 9:29 AM Nicolaus Weidner <nicolaus.weid...@ververica.com<mailto:nicolaus.weid...@ververica.com>> wrote: Hi Marco, I'm no expert on the Kafka producer, but I will try to help. [1] seems to have a decent explanation of possible error causes for the error you encountered. Which leads me to two questions: if (druidProducerTransactionMaxTimeoutMs > 0) { properties.setProperty("transaction.max.timeout.ms<http://transaction.max.timeout.ms>", Integer.toString(druidProducerTransactionMaxTimeoutMs)); } if (druidProducerTransactionTimeoutMs > 0) { properties.setProperty("transaction.timeout.ms<http://transaction.timeout.ms>", Integer.toString(druidProducerTransactionTimeoutMs)); } Have you tried increasing the timeout settings, to see if transactions timed out? properties.setProperty("transactional.id<http://transactional.id>", "local.druid"); Do you use multiple producers (parallelism > 1)? It seems you always set the same transactional.id<http://transactional.id>, which I expect causes problems when you have multiple producer instances (see "zombie fencing" in [2]). In that case, just make sure they are unique. And one additional question: Does the error occur consistently, or only occasionally? Best, Nico [1] https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send [2] https://stackoverflow.com/a/52304789 Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.