As noted in the SO, it's a bit confusing to me how the `checkpointing.mode` delivery guarantees with the ones for the different sinks, and in particular with the kafka one.
Based on the error I had, I understand that if I use `EXACTLY_ONCE` for the checkpoints and I indicate nothing in the kafka sink, the default guarantee for it is overriden and/or transactions are used anyway (???). Does the checkpointing.mode guarantee really override the default one for kafka? If so, would something like this ``` // setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // don't call this, in order for the kafka sink to automatically adapt setTransactionalIdPrefix("XYZ") // just in case transactions are required ```` make the kafka sink automatically adapt to the checkpointing.mode (that is, use the same guarantee) or on the contrary I should explicitly set both guarantees? E.g., ``` execution.checkpointing.mode='EXACTLY_ONCE'` ``` plus ``` setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) setTransactionalIdPrefix("XYZ") ``` Or, for `AT_LEAST_ONCE`: ``` execution.checkpointing.mode='AT_LEAST_ONCE'` ``` plus ``` setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // setTransactionalIdPrefix("XYZ") // don't required in this case ``` Any clarifications on this would be highly appreciated. Maybe you can point me to the relevant code (or docs) if the interaction between those guarantees is already well-documented. Thanks in advance, Salva On Mon, Nov 7, 2022 at 8:06 AM Salva Alcántara <salcantara...@gmail.com> wrote: > I had a Flink 1.15.1 job configured with > > ``` > execution.checkpointing.mode=`EXACTLY_ONCE` > ``` > > that was failing with the following error > ``` > Sink: Committer (2/2)#732 (36640a337c6ccdc733d176b18adab979) switched from > INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException: > Failed to commit KafkaCommittable{producerId=4521984, epoch=0, > transactionalId=} > ... > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > for configuration transactional.id: String must be non-empty > ``` > > that happened after the first checkpoint was triggered. The strange thing > about it is that the `KafkaSinkBuilder` was used without calling > `setDeliverGuarantee`, and hence the default delivery guarantee was > expected to be used, which is `NONE` [1]. > > Is that even possible to start with? Shouldn't kafka transactions be > involved only when one follows [this recipe] [2]: > > * <p>One can also configure different {@link DeliveryGuarantee} by using > {@link > * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using > {@link > * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix > {@link > * #setTransactionalIdPrefix(String)}. > > So, in my case, without calling `setDeliverGuarantee` (nor > `setTransactionalIdPrefix`), I cannot understand why I was seeing these > errors. To avoid the problem, I temporarily changed the checkpointing > settings to > > ``` > execution.checkpointing.mode=`AT_LEAST_ONCE` > ``` > > but I'd like to understand what was happening. > > > [1]: > https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L66 > [2]: > https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L51 > > FYI I've also posted this in SO here: > - > https://stackoverflow.com/questions/74342971/transactional-id-errors-when-using-kafka-sink-with-exactly-once-checkpoints >