Hi John,Thanks for your Jean Wisser Data Engineer
[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png] Flow Traders B.V. T: +31 20 799 6497 F: +31 20 799 6780 Jacob Bontiusplaats 9 1018 LL Amsterdam Nederland www.flowtraders.com<http://www.flowtraders.com> ________________________________ From: John Casey <johnjca...@google.com> Sent: Monday, June 6, 2022 8:19:22 PM To: user Subject: [EXTERNAL] Re: [Questions] KafkaIO SplittableDoFn offset managment CAUTION: This Email is from an EXTERNAL source. Ensure you trust this sender before clicking on any links or attachments. Hi Jean, I recently resolved some bugs involved in committing offsets to incorrect group names, and those fixes should be available in the most recent version of beam (2.38). For your second question, a single partition can only be consumed by a single worker, so there is no issues with committing offsets in the correct order. This is because the unit of parallelism in Kafka is the partition, so KafkaIO is currently designed to work within that space. John On Mon, Jun 6, 2022 at 1:04 PM Jean Wisser <jwis...@flowtraders.com<mailto:jwis...@flowtraders.com>> wrote: Hi, I am trying to use KafkaIO with the new SplittableDoFn, but I am having trouble understanding how offset management is supposed to work (using beam version 2.36.0). - In the comments of ReadFromKafkaDoFn it is stated that the initial range for KafkaSourceDescriptor startOffset can take the value of the {@code last committed offset + 1} for the {@link Consumer#position(TopicPartition). But when looking at the implementation of initialRestriction(), the consumer that tries to get the committed offset is built using: consumerFactoryFn.apply( KafkaIOUtils.getOffsetConsumerConfig( "initialOffset", offsetConsumerConfig, updatedConsumerConfig)) and getOffsetConsumerConfig() appends a random number to the consumer group name. In addition to that, KafkaCommitOffset which is responsible for committing offsets uses a different name for the consumer groupId. How is it supposed to get the last committed offset if the group name is not the same? - I am also trying to commit the offsets manually at the end of my pipeline and get KafkaIO to resume from the last committed offset by usingWatchKafkaTopicPartitionDoFn when building KafkaSourceDescriptor.of(). But since - if I understand correctly - a single (topic,partition) can be split into multiple workers, how can we make sure that we always commit offsets in the correct order ? Thanks a lot for your help, Jean. Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with the Trade Registry of the Chamber of Commerce under number 33.22.3268. This message may contain information that is not intended for you. If you are not the addressee or if this message was sent to you by mistake, you are requested to inform the sender and delete the message. This message may not be forwarded or published to any other person than its addressees without Flow Traders B.V.’s prior consent. Flow Traders B.V. accepts no liability for damage of any kind resulting from the risks inherent in the electronic transmission of messages.