.withReadCommitted() doesn't commit messages when read, it instead
specifies that the kafka consumer should only read messages that have
themselves been committed to kafka.

Its use is for exactly once applications.



On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh <balazs.nem...@aliz.ai> wrote:

> I have been reading from Kafka and trying to figure out which offset
> management would be the best for my use-case. During that I noticed
> something odd.
>
>
> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362
>
>     private boolean configuredKafkaCommit() {
>       return getConsumerConfig().get("isolation.level") == "read_committed"
>           ||
> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
>     }
>
> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
>
> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334
>
> The name of the method, and how it's being used in the code certainly
> suggest that using read_committed isolation level handles and commits
> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it.
> Well it does not.
>
> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
> - using ONLY commitOffsetsInFinalize() does commit it
>
> - using ONLY withReadCommitted() does NOT commit it
>
> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read
>
> So is it a bug, or what am I missing here?
>
> If it is indeed a bug, then is it with the read_committed (so it should
> commit it although found no explicit documentation about that anywhere), or
> having that isolation level shouldn't prefer the commit in the finalize and
> that method is wrong?
>
>
>
>

Reply via email to