yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1081145849


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> 
callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   I've gone ahead and made the changes to convert the 
`KafkaConfigBackingStore` APIs to be synchronous even when EOS is disabled 
(thus making the behavior more in-line with the EOS case). This simplifies 
things significantly while also providing consistency w.r.t error handling 
across all the `KafkaConfigBackingStore` APIs without making invasive changes.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I've removed the usage of the producer callback here since we're moving to 
synchronous usage of producer send in the non-EOS case as well anyway (aside 
from the earlier point that it doesn't really make sense to handle errors via 
both a callback as well as `commitTransaction`). The behavior of surfacing 
exceptions synchronously is similar in both cases now; one through calling 
`get()` on the returned future from `Producer::send` and the other through 
`Producer::commitTransaction`.
   
   > Another option for the above issue could be changing the exception mapper 
to concatenate all the exception messages from the exception chain.
   
   Yet another option for this could be to simply append a "Check the worker 
logs for more details on the error" to the top level exception's message in the 
REST API response (the worker logs will have the entire exception chain). 
Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to