Great, thanks so much for the detailed answer.

Best,
________________________________
From: Sophie Blee-Goldman <sop...@confluent.io.INVALID>
Sent: Tuesday, March 16, 2021 5:58 AM
To: users@kafka.apache.org <users@kafka.apache.org>
Subject: Re: Slightly Modified Sticky Assignor.

Hey Mazen,

The easiest way to approach this is probably to pass in a reference to the
associated Consumer and
then just call one of the *Consumer#committed *methods which return the
OffsetAndMetadata.

But I'm guessing your underling question may be about how to get that
reference to the Consumer in
the first place. There really isn't very good support for this in the
ConsumerPartitionAssignor interface
since it relies on reflection to instantiate the assignor with the default
constructor. I would recommend
making your custom ConsumerPartitionAssignor implement the Configurable
interface, and then use
the configs you pass in to the Consumer to ultimately get a handle on it.
Since you have to provide the
configs to construct the Consumer in the first place, you might need a
layer of indirection: for example

class ConsumerProvider {
    private Consumer consumer;

    public void setConsumer(Consumer consumer);

    public Consumer getConsumer();
}

// main code
ConsumerProvider provider = new ConsumerProvider;
consumerConfig.put("MY_CONSUMER_PROVIDER", provider);
Consumer consumer = new KafkaConsumer(consumerConfig, ...);
provider.setConsumer(consumer);

class MyAssignor implements ConsumerPArtitionAssignor, Configurable {

    private ConsumerProvider;

    @Override
    public void configure(configs) {
        this.consumerProvider = configs.get("MY_CONSUMER_PROVIDER");
    }

    @Override
    ByteBuffer subscriptionUserData(topics) {
        offsets = consumerProvider.getConsumer().committed(...);
    }
}

Just a warning, I haven't actually tested this out, but the general idea of
using configs should work.

I know you said "seamless" and this is anything but :/ Maybe I'm tired and
missing something obvious, but
clearly there's room for improvement here. You can file a JIRA ticket to
improve the partition assignor
experience and make it easier to set up (and even work on this yourself if
you're interested)

Unfortunately you generally want to keep the subscriptionUserData() method
pretty light, and this
method will make a remote call to the server. To be honest, I'm not totally
sure why that is the case, since
the Consumer should know what offsets it's committed for which
partitions...maybe someone else can
jump in on the choice behind that. This has come up before, so it's worth
investigating whether we can
just return the cached offsets if they're available and only make a remote
call for *Consumer#committed* if
absolutely necessary. I'll try to follow up on that

On Tue, Mar 9, 2021 at 9:26 AM Mazen Ezzeddine <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Hi all,
>
> I am implementing a custom partition assignor slightly different than the
> sticky assignor  assignor. As known, in the sticky assignor each consumer
> sends the set of owned partitions as part of its subscription message. This
> happens in the subscriptionUserData by calling the
> serializeTopicPartitionAssignment method etc…
>
> Kindly, my question is that what is the most seamless way to get offset
> information (e.g., last committed offset) for each owned partition from
> within the subscriptionUserData method or generally from within the
> stickyAssignor class, preferably using public APIs.
>
> Thank you.
>

Reply via email to