Great, thanks so much for the detailed answer. Best, ________________________________ From: Sophie Blee-Goldman <sop...@confluent.io.INVALID> Sent: Tuesday, March 16, 2021 5:58 AM To: users@kafka.apache.org <users@kafka.apache.org> Subject: Re: Slightly Modified Sticky Assignor.
Hey Mazen, The easiest way to approach this is probably to pass in a reference to the associated Consumer and then just call one of the *Consumer#committed *methods which return the OffsetAndMetadata. But I'm guessing your underling question may be about how to get that reference to the Consumer in the first place. There really isn't very good support for this in the ConsumerPartitionAssignor interface since it relies on reflection to instantiate the assignor with the default constructor. I would recommend making your custom ConsumerPartitionAssignor implement the Configurable interface, and then use the configs you pass in to the Consumer to ultimately get a handle on it. Since you have to provide the configs to construct the Consumer in the first place, you might need a layer of indirection: for example class ConsumerProvider { private Consumer consumer; public void setConsumer(Consumer consumer); public Consumer getConsumer(); } // main code ConsumerProvider provider = new ConsumerProvider; consumerConfig.put("MY_CONSUMER_PROVIDER", provider); Consumer consumer = new KafkaConsumer(consumerConfig, ...); provider.setConsumer(consumer); class MyAssignor implements ConsumerPArtitionAssignor, Configurable { private ConsumerProvider; @Override public void configure(configs) { this.consumerProvider = configs.get("MY_CONSUMER_PROVIDER"); } @Override ByteBuffer subscriptionUserData(topics) { offsets = consumerProvider.getConsumer().committed(...); } } Just a warning, I haven't actually tested this out, but the general idea of using configs should work. I know you said "seamless" and this is anything but :/ Maybe I'm tired and missing something obvious, but clearly there's room for improvement here. You can file a JIRA ticket to improve the partition assignor experience and make it easier to set up (and even work on this yourself if you're interested) Unfortunately you generally want to keep the subscriptionUserData() method pretty light, and this method will make a remote call to the server. To be honest, I'm not totally sure why that is the case, since the Consumer should know what offsets it's committed for which partitions...maybe someone else can jump in on the choice behind that. This has come up before, so it's worth investigating whether we can just return the cached offsets if they're available and only make a remote call for *Consumer#committed* if absolutely necessary. I'll try to follow up on that On Tue, Mar 9, 2021 at 9:26 AM Mazen Ezzeddine < mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > Hi all, > > I am implementing a custom partition assignor slightly different than the > sticky assignor assignor. As known, in the sticky assignor each consumer > sends the set of owned partitions as part of its subscription message. This > happens in the subscriptionUserData by calling the > serializeTopicPartitionAssignment method etc… > > Kindly, my question is that what is the most seamless way to get offset > information (e.g., last committed offset) for each owned partition from > within the subscriptionUserData method or generally from within the > stickyAssignor class, preferably using public APIs. > > Thank you. >