kpatelatwork commented on a change in pull request #10822: URL: https://github.com/apache/kafka/pull/10822#discussion_r660836387
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ########## @@ -761,6 +803,44 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) } + @SuppressWarnings("unchecked") + RestartRequest recordToRestartRequest(ConsumerRecord<String, byte[]> record, SchemaAndValue value) { + String connectorName = record.key().substring(RESTART_PREFIX.length()); + if (value.value() == null) { + log.error("Ignoring restart request because it is unexpectedly null"); + return null; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring restart request because the value is not a Map but is {}", value.value().getClass()); + return null; + } + + Map<String, Object> valueAsMap = (Map<String, Object>) value.value(); + + Object failed = valueAsMap.get(ONLY_FAILED_FIELD_NAME); + if (failed == null) { + log.warn("Invalid data for restart request '{}' field was missing, defaulting to {}", ONLY_FAILED_FIELD_NAME, ONLY_FAILED_DEFAULT); Review comment: we are logging an error if we are rejecting the request due to an invalid type, but in this particular case as we are defaulting the missing fields, that's why the warn. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org