Hi All,
I'm upgrading Camel from 3.16 to 3.17 and found a problem that OffsetRepository
didn't get update after manual commit.
It is a Spring boot project and MemoryStateRepository as follows.
@Bean
public MemoryStateRepository offsetRepo() {
MemoryStateRepository stateRepository = new
MemoryStateRepository();
stateRepository.setState(topic + "/0", "");
return stateRepository;
}
Here is the method I use when commit:
private void commitOffsetToKafka(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
manual.commit();
}
Here is my Kafka endpoint
kafka:integration_test
_topic?brokers=127.0.0.1:41478&autoCommitEnable=false&allowManualCommit=true&seekTo=beginning&maxPollRecords=2&groupId=ANE&autoOffsetReset=earliest&offsetRepository=#offsetRepo&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory
It is working fine in 3.16. When DefaultkafkaManualSyncCommit.commit() is
invoked. The offsetRepository.setState() get invoked so the offset increased.
However, in 3.17, DefaultkafkaManualSyncCommit.commit() doesn't have a
reference to offsetRepository so the offset in offsetRepository won't get
increased.
Is this a bug or there is a different way to get the offset after committing.
Could you please suggest what I should do to test if the offset get increased
after manual commit?
Best regards,
Yuttana Sangchoei
This e-mail and any attachments to it (the "Communication") is, unless
otherwise stated, confidential, may contain copyright material and is for the
use only of the intended recipient. If you receive the Communication in error,
please notify the sender immediately by return e-mail, delete the Communication
and the return e-mail, and do not read, copy, retransmit or otherwise deal with
it. Any views expressed in the Communication are those of the individual sender
only, unless expressly stated to be those of Australia and New Zealand Banking
Group Limited ABN 11 005 357 522, or any of its related entities including ANZ
Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in
connection with the integrity of or errors in the Communication, computer
virus, data corruption, interference or delay arising from or in respect of the
Communication.