kkonstantine commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660868474
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) } + @SuppressWarnings("unchecked") + RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) { + String connectorName = record.key().substring(RESTART_PREFIX.length()); + if (value.value() == null) { + log.error("Ignoring restart request because it is unexpectedly null"); + return null; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); + return null; + } + + Map<String, Object> valueAsMap = (Map<String, Object>) value.value(); + + Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); + if (failed == null) { + log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); Review comment: Didn't notice. Thanks. Makes sense then -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org