[
https://issues.apache.org/jira/browse/BEAM-6466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746519#comment-16746519
]
Raghu Angadi commented on BEAM-6466:
------------------------------------
Added this comment to the above thread:
Yeah, there is a timing issue. 'finalizeCheckpoint()' does not wait until
checkpoint is committed by the IO thread. See comment at
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L613
This is best suited for unbounded case, since we don't want to block on each
call to finalize checkpoint. There could be lots of these calls per second in a
streaming pipeline and we only need to commit the latest checkpoint. But that
does not work well when this is used in bounded reader context.
Fix: KafkaIO could store a flag that it is being read by a bounded wrapper (see
expand() where the bounded wrapper is added). When this flag is set it could
wake up the IO thread and wait for offsets to be committed.
> KafkaIO doesn't commit offsets while being used as bounded source
> -----------------------------------------------------------------
>
> Key: BEAM-6466
> URL: https://issues.apache.org/jira/browse/BEAM-6466
> Project: Beam
> Issue Type: Bug
> Components: io-java-kafka
> Affects Versions: 2.9.0
> Reporter: Alexey Romanenko
> Priority: Major
>
> While using KafkaIO as bounded source (with {{withMaxReadTime()}} or
> {{withMaxNumRecords()}}) it seems doesn't commit offsets all the time.
> See the details in [the
> discussion|https://lists.apache.org/thread.html/bcec8a1fb166029a4adf3f3491c407d49843406020b20f203ec3c2d2@%3Cuser.beam.apache.org%3E]
> on user@list.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)