Sorry for being late. It seems like I found a case which requires a method to update Consumer metadata. In short, kafka-console-consumer.sh is working differently from 2.1.0 for lack of this functionality.
https://issues.apache.org/jira/browse/KAFKA-8789 https://github.com/apache/kafka/pull/7206 Thanks, Dongjin On Tue, Aug 13, 2019 at 9:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > I've had a look on KIP-396 and until now only 1 binding vote arrived. Hope > others would consider it as a good solution... > > G > > > On Tue, Aug 13, 2019 at 11:52 AM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > > > I've had concerns calling AdminClient.listTopics because on big clusters > > I've seen OOM because of too many TopicPartitions. > > On the other this problem already exists in the actual implementation > > because as Colin said Consumer is doing the same on client side. All in > all > > this part is fine. > > > > I've checked all the actual use-cases on Spark side which has to be > > covered and it looks doable. > > > > > > On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <kabh...@gmail.com> wrote: > > > >> So in overall, AdminClient covers the necessary to retrieve up-to-date > >> topic-partitions, whereas KIP-396 will cover the necessary to retrieve > >> offset (EARLIEST, LATEST, timestamp) on partition. > >> > >> Gabor, could you please add the input if I'm missing something? I'd like > >> to > >> double-check on this. > >> > >> Assuming I'm not missing something, what would be preferred next action? > >> Personally I'd keep this as it is until KIP-396 passes the vote (the > vote > >> for KIP-396 opened at January and it still doesn't pass - 7 months - > which > >> worries me a bit if it's going to pass the vote or not), but I also > >> respect > >> the lifecycle of KIP in Kafka community. > >> > >> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabh...@gmail.com> > wrote: > >> > >> > > >> > > >> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org> > >> wrote: > >> > > >> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote: > >> >> > Thanks for the feedbacks Colin and Matthias. > >> >> > > >> >> > I agree with you regarding getting topics and partitions via > >> >> AdminClient, > >> >> > just curious how much the overhead would be. Would it be lighter, > or > >> >> > heavier? We may not want to list topics in regular intervals - in > >> plan > >> >> > phase we want to know up-to-date information so that the > calculation > >> >> from > >> >> > Spark itself makes sense. > >> >> > >> >> It would be lighter. The consumer will periodically refresh metadata > >> for > >> >> any topic you are subscribed to. AdminClient doesn’t have the concept > >> of > >> >> subscriptions, and won’t refresh topic metadata until you request it. > >> >> > >> > > >> > Sounds great! Happy to hear about that. > >> > > >> > > >> >> > >> >> > > >> >> > On the other hands I'm not seeing any information regarding offset > in > >> >> > current AdminClient, which is also one of reason we leverage > consumer > >> >> and > >> >> > call poll(0). Colin, as you mentioned there're KIPs addressing > this, > >> >> could > >> >> > you refer KIPs so that we can see whether it would work for our > case? > >> >> > Without support of this we cannot replace our usage of > consumer/poll > >> >> with > >> >> > AdminClient. > >> >> > >> >> KIP-396 is the one for listing offsets in AdminClient. > >> >> > >> > > >> > KIP-396 seems to fit to the needs on Spark's purpose to get offset > >> > information, even for timestamp. Thanks! > >> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one > call, > >> > but not a big deal as it just requires two calls. > >> > > >> > > > >> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` > >> which > >> >> > receives regex same as consumer subscription via pattern. We would > >> like > >> >> to > >> >> > provide same behavior what Kafka is basically providing as a > source. > >> >> > >> >> We don’t have a regex listTopics at the moment, though we could add > >> this. > >> >> Currently, the regex is done on the client side anyway (although we’d > >> >> really like to change this in the future). So just listing everything > >> and > >> >> filtering locally would be the same performance and behavior as the > >> >> Consumer. > >> >> > >> > > >> > I see. Good to know regex is done on the client side - I've just > >> searched > >> > some code and it applies filter for all topics retrieved from metadata > >> > fetch. Then it would be mostly no difference on this. Thanks for > >> confirming. > >> > > >> > > >> >> > >> >> best, > >> >> Colin > >> >> > >> >> > > >> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax < > >> matth...@confluent.io> > >> >> > wrote: > >> >> > > >> >> > > Thanks for the details Jungtaek! > >> >> > > > >> >> > > I tend to agree with Colin, that using the AdminClient seems to > be > >> the > >> >> > > better choice. > >> >> > > > >> >> > > You can get all topics via `listTopics()` (and you can refresh > this > >> >> > > information on regular intervals) and match any pattern against > the > >> >> list > >> >> > > of available topics in the driver. > >> >> > > > >> >> > > As you use `assignment()` and store offsets in the Spark > >> checkpoint, > >> >> it > >> >> > > seems that using consumer group management is not a good fit for > >> the > >> >> use > >> >> > > case. > >> >> > > > >> >> > > > >> >> > > Thoughts? > >> >> > > > >> >> > > > >> >> > > > >> >> > > -Matthias > >> >> > > > >> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote: > >> >> > > > Hi, > >> >> > > > > >> >> > > > If there’s no need to consume records in the Spark driver, then > >> the > >> >> > > Consumer is probably the wrong thing to use. Instead, Spark > should > >> use > >> >> > > AdminClient to find out what partitions exist and where, manage > >> their > >> >> > > offsets, and so on. There are some KIPs under discussion now that > >> >> would add > >> >> > > the necessary APIs for managing offsets. > >> >> > > > > >> >> > > > Best, > >> >> > > > Colin > >> >> > > > > >> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: > >> >> > > >> My feeling is that I didn't explain the use case for Spark > >> >> properly and > >> >> > > >> hence fail to explain the needs. Sorry about this. > >> >> > > >> > >> >> > > >> Spark leverages the single instance of KafkaConsumer in the > >> driver > >> >> > > which is > >> >> > > >> registered solely on the consumer group. This is used in the > >> plan > >> >> phase > >> >> > > for > >> >> > > >> each micro-batch to calculate the overall topicpartitions with > >> its > >> >> > > offset > >> >> > > >> ranges for this batch, and split and assign (topicpartition, > >> >> fromOffset, > >> >> > > >> untilOffset) to each input partition. After the planning is > done > >> >> and > >> >> > > tasks > >> >> > > >> are being distributed to executors, consumer per each input > >> >> partition > >> >> > > will > >> >> > > >> be initialized from some executor (being assigned to the > single > >> >> > > >> topicpartition), and pull the actual records. (Pooling > >> consumers is > >> >> > > applied > >> >> > > >> for sure.) As plan phase is to determine the overall > >> >> topicpartitions and > >> >> > > >> offset ranges to process, Spark is never interested on pulling > >> the > >> >> > > records > >> >> > > >> in driver side. > >> >> > > >> > >> >> > > >> Spark mainly leverages poll(0) to get the latest assigned > >> >> partitions and > >> >> > > >> adopt the changes or validate the expectation. That's not only > >> use > >> >> case > >> >> > > for > >> >> > > >> poll(0). Spark is also seeking the offset per topicpartition > to > >> the > >> >> > > >> earliest or the latest, or specific one (either provided by > end > >> >> user or > >> >> > > the > >> >> > > >> last committed offset) so that Spark can have actual offset or > >> >> validate > >> >> > > the > >> >> > > >> provided offset. According to the javadoc (if I understand > >> >> correctly), > >> >> > > to > >> >> > > >> get the offset immediately it seems to be required to call > >> `poll` > >> >> or > >> >> > > >> `position`. > >> >> > > >> > >> >> > > >> The way Spark interacts with Kafka in this plan phase in > driver > >> is > >> >> > > >> synchronous, as the phase should finish ASAP to run the next > >> phase. > >> >> > > >> Registering ConsumerRebalanceListener and tracking the change > >> will > >> >> > > require > >> >> > > >> some asynchronous handling which sounds to add unnecessary > >> >> complexity. > >> >> > > >> Spark may be OK with deal with synchronous with timeout > (that's > >> >> what > >> >> > > >> methods in KafkaConsumer have been providing - they're not > >> >> > > asynchronous, at > >> >> > > >> least for callers) but dealing with asynchronous is another > >> level > >> >> of > >> >> > > >> interest. I can see the benefit where continuous thread runs > and > >> >> the > >> >> > > >> consumer is busy with something continuously, relying on > >> listener > >> >> to > >> >> > > hear > >> >> > > >> the news on reassignment. Unfortunately that's not the case. > >> >> > > >> > >> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test > >> code > >> >> also > >> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in > >> many > >> >> > > places > >> >> > > >> as it's appropriate to the place which blocking (+timeout) > call > >> is > >> >> > > >> preferred - so I can see the similar needs from here as well. > >> >> > > >> > >> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi < > >> >> > > gabor.g.somo...@gmail.com> > >> >> > > >> wrote: > >> >> > > >> > >> >> > > >>> Hi Guys, > >> >> > > >>> > >> >> > > >>> Please see the actual implementation, pretty sure it explains > >> the > >> >> > > situation > >> >> > > >>> well: > >> >> > > >>> > >> >> > > >>> > >> >> > > > >> >> > >> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala > >> >> > > >>> > >> >> > > >>> To answer one question/assumption which popped up from all of > >> you > >> >> > > Spark not > >> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe + > >> >> > > >>> KafkaConsumer#assign as well. > >> >> > > >>> Please see here: > >> >> > > >>> > >> >> > > >>> > >> >> > > > >> >> > >> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala > >> >> > > >>> > >> >> > > >>> BR, > >> >> > > >>> G > >> >> > > >>> > >> >> > > >>> > >> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana < > >> >> > > satish.dugg...@gmail.com> > >> >> > > >>> wrote: > >> >> > > >>> > >> >> > > >>>> Hi Jungtaek, > >> >> > > >>>> Thanks for the KIP. I have a couple of questions here. > >> >> > > >>>> Is not Spark using Kafka's consumer group management across > >> >> multiple > >> >> > > >>>> consumers? > >> >> > > >>>> > >> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern, > >> >> > > >>>> ConsumerRebalanceListener listener) only to get all the > topics > >> >> for a > >> >> > > >>>> pattern based subscription and Spark manually assigns those > >> >> > > >>>> topic-partitions across consumers on workers? > >> >> > > >>>> > >> >> > > >>>> Thanks, > >> >> > > >>>> Satish. > >> >> > > >>>> > >> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax < > >> >> > > matth...@confluent.io> > >> >> > > >>>> wrote: > >> >> > > >>>> > >> >> > > >>>>> If am not sure if I fully understand yet. > >> >> > > >>>>> > >> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but > as > >> >> part of > >> >> > > >>> its > >> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am > >> >> missing > >> >> > > >>>>> something here. > >> >> > > >>>>> > >> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group > >> >> mechanism, > >> >> > > >>>>> that takes care of the assignment of partitions to clients > >> >> within the > >> >> > > >>>>> group. Therefore, I am not sure what you mean by: > >> >> > > >>>>> > >> >> > > >>>>>> which Spark needs to > >> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly. > >> >> > > >>>>> > >> >> > > >>>>> Multiple thoughts that may help: > >> >> > > >>>>> > >> >> > > >>>>> - if Spark needs more control about the partition > assignment, > >> >> you can > >> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the > >> consumer > >> >> > > >>>>> configuration) > >> >> > > >>>>> > >> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener` > >> via > >> >> > > >>>>> `subscribe()` to get informed when the group rebalances > >> >> > > >>>>> > >> >> > > >>>>> As you pointed out, using pattern subscription metadata can > >> >> change if > >> >> > > >>>>> topic are added/deleted. However, each metadata change will > >> >> > > triggering > >> >> > > >>> a > >> >> > > >>>>> rebalance and thus you would get corresponding calls to you > >> >> rebalance > >> >> > > >>>>> listener to learn about it and react accordingly. > >> >> > > >>>>> > >> >> > > >>>>> Maybe you can explain why neither of both approaches works > >> and > >> >> what > >> >> > > gap > >> >> > > >>>>> the new API would close? > >> >> > > >>>>> > >> >> > > >>>>> > >> >> > > >>>>> -Matthias > >> >> > > >>>>> > >> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote: > >> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say > >> about > >> >> Apache > >> >> > > >>>>> Spark, > >> >> > > >>>>>> but this will apply for everything which want to control > >> >> offset of > >> >> > > >>>> Kafka > >> >> > > >>>>>> consumers. > >> >> > > >>>>>> > >> >> > > >>>>>> Spark is managing the committed offsets and the offsets > >> which > >> >> should > >> >> > > >>> be > >> >> > > >>>>>> polled now. Topics and partitions as well. This is > required > >> as > >> >> Spark > >> >> > > >>>>> itself > >> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is > just a > >> >> one of > >> >> > > >>>>>> source/sink (though it's considered as very important). > >> >> > > >>>>>> > >> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which > >> >> topics and > >> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), > >> but > >> >> as > >> >> > > >>> Spark > >> >> > > >>>>> can > >> >> > > >>>>>> also provide "patterns" of topics, as well as subscription > >> can > >> >> be > >> >> > > >>>> changed > >> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) > which > >> >> Spark > >> >> > > >>> needs > >> >> > > >>>>> to > >> >> > > >>>>>> know to coordinate multiple consumers to pull correctly. > >> >> > > >>>>>> > >> >> > > >>>>>> Looks like assignment() doesn't update the assignment > >> >> information in > >> >> > > >>>>>> consumer. It just returns known one. There's only one > known > >> >> approach > >> >> > > >>>>> doing > >> >> > > >>>>>> this, calling `poll`, but Spark is not interested on > >> returned > >> >> > > >>> records, > >> >> > > >>>> so > >> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated > >> the > >> >> API. > >> >> > > >>> This > >> >> > > >>>>> KIP > >> >> > > >>>>>> proposes to support this as official approach. > >> >> > > >>>>>> > >> >> > > >>>>>> > >> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim < > >> >> kabh...@gmail.com> > >> >> > > >>>> wrote: > >> >> > > >>>>>> > >> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as > >> well. > >> >> I'm in > >> >> > > >>>>> favor > >> >> > > >>>>>>> of describing it in this discussion thread so the > >> discussion > >> >> itself > >> >> > > >>>> can > >> >> > > >>>>> go > >> >> > > >>>>>>> forward. So copying my answer here: > >> >> > > >>>>>>> > >> >> > > >>>>>>> We have some use case which we don't just rely on > >> everything > >> >> what > >> >> > > >>>> Kafka > >> >> > > >>>>>>> consumer provides. We want to know current assignment on > >> this > >> >> > > >>>> consumer, > >> >> > > >>>>> and > >> >> > > >>>>>>> to get the latest assignment, we called the hack > `poll(0)`. > >> >> > > >>>>>>> > >> >> > > >>>>>>> That said, we don't want to pull any records here, and if > >> I'm > >> >> not > >> >> > > >>>>> missing > >> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me > if > >> >> I'm > >> >> > > >>>> missing > >> >> > > >>>>>>> something. > >> >> > > >>>>>>> > >> >> > > >>>>>>> Thanks, > >> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR) > >> >> > > >>>>>>> > >> >> > > >>>>>>> > >> >> > > >>>>>>> > >> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < > >> >> > > >>>> matth...@confluent.io> > >> >> > > >>>>>>> wrote: > >> >> > > >>>>>>> > >> >> > > >>>>>>>> Thanks for the KIP. > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for > >> this > >> >> > > >>> feature? > >> >> > > >>>>>>>> Why would a consumer need to update it's metadata > >> explicitly? > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> -Matthias > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: > >> >> > > >>>>>>>>> Hi devs, > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, > exposing > >> new > >> >> > > >>> public > >> >> > > >>>>>>>> method > >> >> > > >>>>>>>>> to only update assignment metadata in consumer. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it > >> >> doesn't > >> >> > > >>>>>>>> guarantee > >> >> > > >>>>>>>>> that it doesn't pull any records, and new method > >> >> `poll(Duration)` > >> >> > > >>>>>>>> doesn't > >> >> > > >>>>>>>>> have same semantic, so would like to propose new public > >> API > >> >> which > >> >> > > >>>> only > >> >> > > >>>>>>>> does > >> >> > > >>>>>>>>> the desired behavior. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> Please feel free to suggest any improvements on > proposal, > >> >> as I'm > >> >> > > >>> new > >> >> > > >>>>> to > >> >> > > >>>>>>>>> Kafka community and may not catch preferences (like > >> >> > > >>> TimeoutException > >> >> > > >>>>> vs > >> >> > > >>>>>>>>> boolean, etc.) on Kafka project. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> Thanks in advance! > >> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR) > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> > >> >> > > >>>>>>> > >> >> > > >>>>>>> -- > >> >> > > >>>>>>> Name : Jungtaek Lim > >> >> > > >>>>>>> Blog : http://medium.com/@heartsavior > >> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior > >> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior > >> >> > > >>>>>>> > >> >> > > >>>>>> > >> >> > > >>>>>> > >> >> > > >>>>> > >> >> > > >>>>> > >> >> > > >>>> > >> >> > > >>> > >> >> > > >> > >> >> > > >> > >> >> > > >> -- > >> >> > > >> Name : Jungtaek Lim > >> >> > > >> Blog : http://medium.com/@heartsavior > >> >> > > >> Twitter : http://twitter.com/heartsavior > >> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior > >> >> > > >> > >> >> > > > > >> >> > > > >> >> > > > >> >> > > >> >> > -- > >> >> > Name : Jungtaek Lim > >> >> > Blog : http://medium.com/@heartsavior > >> >> > Twitter : http://twitter.com/heartsavior > >> >> > LinkedIn : http://www.linkedin.com/in/heartsavior > >> >> > > >> >> > >> > > >> > > >> > -- > >> > Name : Jungtaek Lim > >> > Blog : http://medium.com/@heartsavior > >> > Twitter : http://twitter.com/heartsavior > >> > LinkedIn : http://www.linkedin.com/in/heartsavior > >> > > >> > >> > >> -- > >> Name : Jungtaek Lim > >> Blog : http://medium.com/@heartsavior > >> Twitter : http://twitter.com/heartsavior > >> LinkedIn : http://www.linkedin.com/in/heartsavior > >> > > > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*