Good morning Marco,

Your fix is pretty plausible:

  *   Kafka transactions get started at the beginning of a checkpoint period 
and contain all events collected through this period,
  *   At the end of the checkpoint period the associated transaction is 
committed and concurrently the transaction of the next checkpoint period is 
started
  *   In your case (checkpoint period + minimum distance) always last at least 
1 minute and hence the transaction timeouts
Kafka transactions work a little different to traditional RDBMS transactions:

  *   They are basically a pointer offset in each kafka partition that marks a 
range of pending events to be committed
  *   A kafka reader can either read-uncommitted, and sees these uncommitted 
events immediately or
  *   If in read-committed mode: needs to wait for the committed record offset 
(per partition) to advance
  *   If transactions don’t commit, such reader effectively gets halted
  *   Kafka transaction timeout is a means to prevent consumers to get blocked 
for too long if one of the producers fails to commit (e.g. crashed)

Your fix to increase kafka transaction timeout is sound in this respect.

Documentation on the kafka page is very detailed …

Open questions? … get back to the community 😊

Cheers

Thias


From: Marco Villalobos <mvillalo...@kineteque.com>
Sent: Mittwoch, 23. Februar 2022 19:11
To: Nicolaus Weidner <nicolaus.weid...@ververica.com>
Cc: user <user@flink.apache.org>
Subject: Re: Trouble sinking to Kafka

I fixed this, but I'm not 100% sure why.

Here is my theory:

My checkpoint interval is one minute, and the minimum pause interval is also 
one minute. My transaction timeout time is also one minute. I think the 
checkpoint causes Flink to hold the transaction open for one minute, and thus 
it times out.

After I changed the 
transaction.max.timeout.ms<http://transaction.max.timeout.ms> to one hour, and 
the transaction.timeout.ms<http://transaction.timeout.ms> to five minutes, it 
all worked like a charm.

Is my theory correct?

The documentation kind of suggestion this is the cause:  
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html

However, I think the documentation could benefit with a few examples and 
scenarios that can ill-considered configurations.

Thank you.

On Wed, Feb 23, 2022 at 9:29 AM Nicolaus Weidner 
<nicolaus.weid...@ververica.com<mailto:nicolaus.weid...@ververica.com>> wrote:
Hi Marco,

I'm no expert on the Kafka producer, but I will try to help. [1] seems to have 
a decent explanation of possible error causes for the error you encountered.
Which leads me to two questions:


if (druidProducerTransactionMaxTimeoutMs > 0) {

      
properties.setProperty("transaction.max.timeout.ms<http://transaction.max.timeout.ms>",
 Integer.toString(druidProducerTransactionMaxTimeoutMs));
   }
   if (druidProducerTransactionTimeoutMs > 0) {
      
properties.setProperty("transaction.timeout.ms<http://transaction.timeout.ms>", 
Integer.toString(druidProducerTransactionTimeoutMs));
   }

Have you tried increasing the timeout settings, to see if transactions timed 
out?


   properties.setProperty("transactional.id<http://transactional.id>", 
"local.druid");

Do you use multiple producers (parallelism > 1)? It seems you always set the 
same transactional.id<http://transactional.id>, which I expect causes problems 
when you have multiple producer instances (see "zombie fencing" in [2]). In 
that case, just make sure they are unique.

And one additional question: Does the error occur consistently, or only 
occasionally?

Best,
Nico

[1] 
https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send
[2] https://stackoverflow.com/a/52304789
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to