Hi,
I am curious about this comment:
if (offset < expected) { // -- (a)
// this can happen when compression is enabled in Kafka (seems to be
fixed in 0.10)
// should we check if the offset is way off from consumedOffset (say
> 1M)?
LOG.warn(
"{}: ignoring already consumed offset {} for {}",
this,
offset,
pState.topicPartition);
continue;
}
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167>
Does it mean that Beam KafkaIO may skip processing some Kafka messages if the
lag in consuming Kafka messages > 1 M?
Why Kafka compression may result in this bug?
Is there anyway to prevent loss messages and enable at-least-once delivery?
Context: We enable at-least-once delivery semantics on our Beam code by this
code:
input
.getPipeline()
.apply(
"ReadFromKafka",
KafkaIO.readBytes()
.withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
.withTopics(getTopics())
.withConsumerConfigUpdates(
ImmutableMap.of(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
ConsumerConfig.GROUP_ID_CONFIG, groupId
))
.withReadCommitted()
.commitOffsetsInFinalize())
However, we notice that if we send > 1 millions Kafka message and the batch
processing can not keep up, it seems that Beam process less number of messages
than we sent.
Regards
Dinh