So in overall, AdminClient covers the necessary to retrieve up-to-date
topic-partitions, whereas KIP-396 will cover the necessary to retrieve
offset (EARLIEST, LATEST, timestamp) on partition.

Gabor, could you please add the input if I'm missing something? I'd like to
double-check on this.

Assuming I'm not missing something, what would be preferred next action?
Personally I'd keep this as it is until KIP-396 passes the vote (the vote
for KIP-396 opened at January and it still doesn't pass - 7 months - which
worries me a bit if it's going to pass the vote or not), but I also respect
the lifecycle of KIP in Kafka community.

On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabh...@gmail.com> wrote:

>
>
> On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org> wrote:
>
>> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
>> > Thanks for the feedbacks Colin and Matthias.
>> >
>> > I agree with you regarding getting topics and partitions via
>> AdminClient,
>> > just curious how much the overhead would be. Would it be lighter, or
>> > heavier? We may not want to list topics in regular intervals - in plan
>> > phase we want to know up-to-date information so that the calculation
>> from
>> > Spark itself makes sense.
>>
>> It would be lighter. The consumer will periodically refresh metadata for
>> any topic you are subscribed to. AdminClient doesn’t have the concept of
>> subscriptions, and won’t refresh topic metadata until you request it.
>>
>
> Sounds great! Happy to hear about that.
>
>
>>
>> >
>> > On the other hands I'm not seeing any information regarding offset in
>> > current AdminClient, which is also one of reason we leverage consumer
>> and
>> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
>> could
>> > you refer KIPs so that we can see whether it would work for our case?
>> > Without support of this we cannot replace our usage of consumer/poll
>> with
>> > AdminClient.
>>
>> KIP-396 is the one for listing offsets in AdminClient.
>>
>
> KIP-396 seems to fit to the needs on Spark's purpose to get offset
> information, even for timestamp. Thanks!
> I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
> but not a big deal as it just requires two calls.
>
> >
>> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
>> > receives regex same as consumer subscription via pattern. We would like
>> to
>> > provide same behavior what Kafka is basically providing as a source.
>>
>> We don’t have a regex listTopics at the moment, though we could add this.
>> Currently, the regex is done on the client side anyway (although we’d
>> really like to change this in the future). So just listing everything and
>> filtering locally would be the same performance and behavior as the
>> Consumer.
>>
>
> I see. Good to know regex is done on the client side - I've just searched
> some code and it applies filter for all topics retrieved from metadata
> fetch. Then it would be mostly no difference on this. Thanks for confirming.
>
>
>>
>> best,
>> Colin
>>
>> >
>> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> > > Thanks for the details Jungtaek!
>> > >
>> > > I tend to agree with Colin, that using the AdminClient seems to be the
>> > > better choice.
>> > >
>> > > You can get all topics via `listTopics()` (and you can refresh this
>> > > information on regular intervals) and match any pattern against the
>> list
>> > > of available topics in the driver.
>> > >
>> > > As you use `assignment()` and store offsets in the Spark checkpoint,
>> it
>> > > seems that using consumer group management is not a good fit for the
>> use
>> > > case.
>> > >
>> > >
>> > > Thoughts?
>> > >
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
>> > > > Hi,
>> > > >
>> > > > If there’s no need to consume records in the Spark driver, then the
>> > > Consumer is probably the wrong thing to use. Instead, Spark should use
>> > > AdminClient to find out what partitions exist and where, manage their
>> > > offsets, and so on. There are some KIPs under discussion now that
>> would add
>> > > the necessary APIs for managing offsets.
>> > > >
>> > > > Best,
>> > > > Colin
>> > > >
>> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>> > > >> My feeling is that I didn't explain the use case for Spark
>> properly and
>> > > >> hence fail to explain the needs. Sorry about this.
>> > > >>
>> > > >> Spark leverages the single instance of KafkaConsumer in the driver
>> > > which is
>> > > >> registered solely on the consumer group. This is used in the plan
>> phase
>> > > for
>> > > >> each micro-batch to calculate the overall topicpartitions with its
>> > > offset
>> > > >> ranges for this batch, and split and assign (topicpartition,
>> fromOffset,
>> > > >> untilOffset) to each input partition. After the planning is done
>> and
>> > > tasks
>> > > >> are being distributed to executors, consumer per each input
>> partition
>> > > will
>> > > >> be initialized from some executor (being assigned to the single
>> > > >> topicpartition), and pull the actual records. (Pooling consumers is
>> > > applied
>> > > >> for sure.) As plan phase is to determine the overall
>> topicpartitions and
>> > > >> offset ranges to process, Spark is never interested on pulling the
>> > > records
>> > > >> in driver side.
>> > > >>
>> > > >> Spark mainly leverages poll(0) to get the latest assigned
>> partitions and
>> > > >> adopt the changes or validate the expectation. That's not only use
>> case
>> > > for
>> > > >> poll(0). Spark is also seeking the offset per topicpartition to the
>> > > >> earliest or the latest, or specific one (either provided by end
>> user or
>> > > the
>> > > >> last committed offset) so that Spark can have actual offset or
>> validate
>> > > the
>> > > >> provided offset. According to the javadoc (if I understand
>> correctly),
>> > > to
>> > > >> get the offset immediately it seems to be required to call `poll`
>> or
>> > > >> `position`.
>> > > >>
>> > > >> The way Spark interacts with Kafka in this plan phase in driver is
>> > > >> synchronous, as the phase should finish ASAP to run the next phase.
>> > > >> Registering ConsumerRebalanceListener and tracking the change will
>> > > require
>> > > >> some asynchronous handling which sounds to add unnecessary
>> complexity.
>> > > >> Spark may be OK with deal with synchronous with timeout (that's
>> what
>> > > >> methods in KafkaConsumer have been providing - they're not
>> > > asynchronous, at
>> > > >> least for callers) but dealing with asynchronous is another level
>> of
>> > > >> interest. I can see the benefit where continuous thread runs and
>> the
>> > > >> consumer is busy with something continuously, relying on listener
>> to
>> > > hear
>> > > >> the news on reassignment. Unfortunately that's not the case.
>> > > >>
>> > > >> Unit tests in Spark have similar needs: looks like Kafka test code
>> also
>> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
>> > > places
>> > > >> as it's appropriate to the place which blocking (+timeout) call is
>> > > >> preferred - so I can see the similar needs from here as well.
>> > > >>
>> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
>> > > gabor.g.somo...@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >>> Hi Guys,
>> > > >>>
>> > > >>> Please see the actual implementation, pretty sure it explains the
>> > > situation
>> > > >>> well:
>> > > >>>
>> > > >>>
>> > >
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>> > > >>>
>> > > >>> To answer one question/assumption which popped up from all of you
>> > > Spark not
>> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
>> > > >>> KafkaConsumer#assign as well.
>> > > >>> Please see here:
>> > > >>>
>> > > >>>
>> > >
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>> > > >>>
>> > > >>> BR,
>> > > >>> G
>> > > >>>
>> > > >>>
>> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
>> > > satish.dugg...@gmail.com>
>> > > >>> wrote:
>> > > >>>
>> > > >>>> Hi Jungtaek,
>> > > >>>> Thanks for the KIP. I have a couple of questions here.
>> > > >>>> Is not Spark using Kafka's consumer group management across
>> multiple
>> > > >>>> consumers?
>> > > >>>>
>> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
>> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
>> for a
>> > > >>>> pattern based subscription and Spark manually assigns those
>> > > >>>> topic-partitions across consumers on workers?
>> > > >>>>
>> > > >>>> Thanks,
>> > > >>>> Satish.
>> > > >>>>
>> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
>> > > matth...@confluent.io>
>> > > >>>> wrote:
>> > > >>>>
>> > > >>>>> If am not sure if I fully understand yet.
>> > > >>>>>
>> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
>> part of
>> > > >>> its
>> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
>> missing
>> > > >>>>> something here.
>> > > >>>>>
>> > > >>>>> As you are using subscribe(), you use Kafka consumer group
>> mechanism,
>> > > >>>>> that takes care of the assignment of partitions to clients
>> within the
>> > > >>>>> group. Therefore, I am not sure what you mean by:
>> > > >>>>>
>> > > >>>>>> which Spark needs to
>> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
>> > > >>>>>
>> > > >>>>> Multiple thoughts that may help:
>> > > >>>>>
>> > > >>>>> - if Spark needs more control about the partition assignment,
>> you can
>> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
>> > > >>>>> configuration)
>> > > >>>>>
>> > > >>>>> - you may also want to register `ConsumerRebalanceListener` via
>> > > >>>>> `subscribe()` to get informed when the group rebalances
>> > > >>>>>
>> > > >>>>> As you pointed out, using pattern subscription metadata can
>> change if
>> > > >>>>> topic are added/deleted. However, each metadata change will
>> > > triggering
>> > > >>> a
>> > > >>>>> rebalance and thus you would get corresponding calls to you
>> rebalance
>> > > >>>>> listener to learn about it and react accordingly.
>> > > >>>>>
>> > > >>>>> Maybe you can explain why neither of both approaches works and
>> what
>> > > gap
>> > > >>>>> the new API would close?
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> -Matthias
>> > > >>>>>
>> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>> > > >>>>>> Let me elaborate my explanation a bit more. Here we say about
>> Apache
>> > > >>>>> Spark,
>> > > >>>>>> but this will apply for everything which want to control
>> offset of
>> > > >>>> Kafka
>> > > >>>>>> consumers.
>> > > >>>>>>
>> > > >>>>>> Spark is managing the committed offsets and the offsets which
>> should
>> > > >>> be
>> > > >>>>>> polled now. Topics and partitions as well. This is required as
>> Spark
>> > > >>>>> itself
>> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
>> one of
>> > > >>>>>> source/sink (though it's considered as very important).
>> > > >>>>>>
>> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
>> topics and
>> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but
>> as
>> > > >>> Spark
>> > > >>>>> can
>> > > >>>>>> also provide "patterns" of topics, as well as subscription can
>> be
>> > > >>>> changed
>> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
>> Spark
>> > > >>> needs
>> > > >>>>> to
>> > > >>>>>> know to coordinate multiple consumers to pull correctly.
>> > > >>>>>>
>> > > >>>>>> Looks like assignment() doesn't update the assignment
>> information in
>> > > >>>>>> consumer. It just returns known one. There's only one known
>> approach
>> > > >>>>> doing
>> > > >>>>>> this, calling `poll`, but Spark is not interested on returned
>> > > >>> records,
>> > > >>>> so
>> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the
>> API.
>> > > >>> This
>> > > >>>>> KIP
>> > > >>>>>> proposes to support this as official approach.
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
>> kabh...@gmail.com>
>> > > >>>> wrote:
>> > > >>>>>>
>> > > >>>>>>> Sorry I didn't recognize you're also asking it here as well.
>> I'm in
>> > > >>>>> favor
>> > > >>>>>>> of describing it in this discussion thread so the discussion
>> itself
>> > > >>>> can
>> > > >>>>> go
>> > > >>>>>>> forward. So copying my answer here:
>> > > >>>>>>>
>> > > >>>>>>> We have some use case which we don't just rely on everything
>> what
>> > > >>>> Kafka
>> > > >>>>>>> consumer provides. We want to know current assignment on this
>> > > >>>> consumer,
>> > > >>>>> and
>> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
>> > > >>>>>>>
>> > > >>>>>>> That said, we don't want to pull any records here, and if I'm
>> not
>> > > >>>>> missing
>> > > >>>>>>> here, there's no way to accomplish this. Please guide me if
>> I'm
>> > > >>>> missing
>> > > >>>>>>> something.
>> > > >>>>>>>
>> > > >>>>>>> Thanks,
>> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>> > > >>>> matth...@confluent.io>
>> > > >>>>>>> wrote:
>> > > >>>>>>>
>> > > >>>>>>>> Thanks for the KIP.
>> > > >>>>>>>>
>> > > >>>>>>>> Can you elaborate a little bit more on the use case for this
>> > > >>> feature?
>> > > >>>>>>>> Why would a consumer need to update it's metadata explicitly?
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> -Matthias
>> > > >>>>>>>>
>> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>> > > >>>>>>>>> Hi devs,
>> > > >>>>>>>>>
>> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
>> > > >>> public
>> > > >>>>>>>> method
>> > > >>>>>>>>> to only update assignment metadata in consumer.
>> > > >>>>>>>>>
>> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
>> doesn't
>> > > >>>>>>>> guarantee
>> > > >>>>>>>>> that it doesn't pull any records, and new method
>> `poll(Duration)`
>> > > >>>>>>>> doesn't
>> > > >>>>>>>>> have same semantic, so would like to propose new public API
>> which
>> > > >>>> only
>> > > >>>>>>>> does
>> > > >>>>>>>>> the desired behavior.
>> > > >>>>>>>>>
>> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>> > > >>>>>>>>>
>> > > >>>>>>>>> Please feel free to suggest any improvements on proposal,
>> as I'm
>> > > >>> new
>> > > >>>>> to
>> > > >>>>>>>>> Kafka community and may not catch preferences (like
>> > > >>> TimeoutException
>> > > >>>>> vs
>> > > >>>>>>>>> boolean, etc.) on Kafka project.
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thanks in advance!
>> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> --
>> > > >>>>>>> Name : Jungtaek Lim
>> > > >>>>>>> Blog : http://medium.com/@heartsavior
>> > > >>>>>>> Twitter : http://twitter.com/heartsavior
>> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > > >>
>> > > >> --
>> > > >> Name : Jungtaek Lim
>> > > >> Blog : http://medium.com/@heartsavior
>> > > >> Twitter : http://twitter.com/heartsavior
>> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
>> > > >>
>> > > >
>> > >
>> > >
>> >
>> > --
>> > Name : Jungtaek Lim
>> > Blog : http://medium.com/@heartsavior
>> > Twitter : http://twitter.com/heartsavior
>> > LinkedIn : http://www.linkedin.com/in/heartsavior
>> >
>>
>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to