[ https://issues.apache.org/jira/browse/FLINK-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104767#comment-16104767 ]
ASF GitHub Bot commented on FLINK-6998: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4187#discussion_r130058545 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception if (ex != null) { log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex); + if (callerCommitCallback != null) { + callerCommitCallback.onException(ex); + } + } + else if (callerCommitCallback != null) { --- End diff -- See my comment above. Would like to remove these null checks. > Kafka connector needs to expose metrics for failed/successful offset commits > in the Kafka Consumer callback > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-6998 > URL: https://issues.apache.org/jira/browse/FLINK-6998 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Zhenzhong Xu > Assignee: Zhenzhong Xu > > Propose to add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in > KafkaConsumerThread class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)