kkonstantine commented on a change in pull request #10822:
URL: https://github.com/apache/kafka/pull/10822#discussion_r660868474



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -761,6 +803,44 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
 
     }
 
+    @SuppressWarnings("unchecked")
+    RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> 
record, SchemaAndValue value) {
+        String connectorName = record.key().substring(RESTART_PREFIX.length());
+        if (value.value() == null) {
+            log.error("Ignoring restart request because it is unexpectedly 
null");
+            return null;
+        }
+        if (!(value.value() instanceof Map)) {
+            log.error("Ignoring restart request because the value is not a Map 
but is {}", value.value().getClass());
+            return null;
+        }
+
+        Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
+
+        Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME);
+        if (failed == null) {
+            log.warn("Invalid data for restart request '{}' field was missing, 
defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT);

Review comment:
       Didn't notice. Thanks. Makes sense then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to