C0urante commented on a change in pull request #11775:
URL: https://github.com/apache/kafka/pull/11775#discussion_r818848052
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -396,13 +441,58 @@
ConfigDef.Importance.LOW,
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
+ private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
+
@Override
public Integer getRebalanceTimeout() {
return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
}
+ @Override
+ public boolean exactlyOnceSourceEnabled() {
+ return exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED;
+ }
+
+ /**
+ * @return whether the Connect cluster's leader should use a transactional
producer to perform writes to the config
+ * topic, which is useful for ensuring that zombie leaders are fenced out
and unable to write to the topic after a
+ * new leader has been elected.
+ */
+ public boolean transactionalLeaderEnabled() {
+ return exactlyOnceSourceSupport.usesTransactionalLeader;
+ }
+
+ /**
+ * @return the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG transactional
ID} to use for the worker's producer if
+ * the worker is the leader of the cluster and is
+ * {@link #transactionalLeaderEnabled() configured to use a transactional
producer}.
Review comment:
Good call. I think rewording is probably safer just to avoid unexpected
NPEs or at least unnecessary refactoring if we end up leveraging this method
for more than just exactly-once support later on and don't want the return
value to be dependent on things like the `exactly.once.source.support` property.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]