[ 
https://issues.apache.org/jira/browse/FLINK-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529532#comment-15529532
 ] 

ASF GitHub Bot commented on FLINK-4702:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2559#discussion_r80909904
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
    @@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
     
                if (this.consumer != null) {
                        synchronized (consumerLock) {
    -                           this.consumer.commitSync(offsetsToCommit);
    +                           if (!commitInProgress) {
    +                                   commitInProgress = true;
    +                                   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    +                           }
    +                           else {
    +                                   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
    --- End diff --
    
    Possibly yes. But on the other hand, this should be pretty visible if it 
happens.
    I would expect that with proper options to participate in group checkpoint 
committing, most Flink jobs run without committing to Kafka/ZooKeeper.


> Kafka consumer must commit offsets asynchronously
> -------------------------------------------------
>
>                 Key: FLINK-4702
>                 URL: https://issues.apache.org/jira/browse/FLINK-4702
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.2.0, 1.1.3
>
>
> The offset commit calls to Kafka may occasionally take very long.
> In that case, the {{notifyCheckpointComplete()}} method blocks for long and 
> the KafkaConsumer cannot make progress and cannot perform checkpoints.
> Kafka 0.9+ have methods to commit asynchronously.
> We should use those and make sure no more than one commit is concurrently in 
> progress, to that commit requests do not pile up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to