So in overall, AdminClient covers the necessary to retrieve up-to-date topic-partitions, whereas KIP-396 will cover the necessary to retrieve offset (EARLIEST, LATEST, timestamp) on partition.
Gabor, could you please add the input if I'm missing something? I'd like to double-check on this. Assuming I'm not missing something, what would be preferred next action? Personally I'd keep this as it is until KIP-396 passes the vote (the vote for KIP-396 opened at January and it still doesn't pass - 7 months - which worries me a bit if it's going to pass the vote or not), but I also respect the lifecycle of KIP in Kafka community. On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabh...@gmail.com> wrote: > > > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org> wrote: > >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote: >> > Thanks for the feedbacks Colin and Matthias. >> > >> > I agree with you regarding getting topics and partitions via >> AdminClient, >> > just curious how much the overhead would be. Would it be lighter, or >> > heavier? We may not want to list topics in regular intervals - in plan >> > phase we want to know up-to-date information so that the calculation >> from >> > Spark itself makes sense. >> >> It would be lighter. The consumer will periodically refresh metadata for >> any topic you are subscribed to. AdminClient doesn’t have the concept of >> subscriptions, and won’t refresh topic metadata until you request it. >> > > Sounds great! Happy to hear about that. > > >> >> > >> > On the other hands I'm not seeing any information regarding offset in >> > current AdminClient, which is also one of reason we leverage consumer >> and >> > call poll(0). Colin, as you mentioned there're KIPs addressing this, >> could >> > you refer KIPs so that we can see whether it would work for our case? >> > Without support of this we cannot replace our usage of consumer/poll >> with >> > AdminClient. >> >> KIP-396 is the one for listing offsets in AdminClient. >> > > KIP-396 seems to fit to the needs on Spark's purpose to get offset > information, even for timestamp. Thanks! > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call, > but not a big deal as it just requires two calls. > > > >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which >> > receives regex same as consumer subscription via pattern. We would like >> to >> > provide same behavior what Kafka is basically providing as a source. >> >> We don’t have a regex listTopics at the moment, though we could add this. >> Currently, the regex is done on the client side anyway (although we’d >> really like to change this in the future). So just listing everything and >> filtering locally would be the same performance and behavior as the >> Consumer. >> > > I see. Good to know regex is done on the client side - I've just searched > some code and it applies filter for all topics retrieved from metadata > fetch. Then it would be mostly no difference on this. Thanks for confirming. > > >> >> best, >> Colin >> >> > >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> > > Thanks for the details Jungtaek! >> > > >> > > I tend to agree with Colin, that using the AdminClient seems to be the >> > > better choice. >> > > >> > > You can get all topics via `listTopics()` (and you can refresh this >> > > information on regular intervals) and match any pattern against the >> list >> > > of available topics in the driver. >> > > >> > > As you use `assignment()` and store offsets in the Spark checkpoint, >> it >> > > seems that using consumer group management is not a good fit for the >> use >> > > case. >> > > >> > > >> > > Thoughts? >> > > >> > > >> > > >> > > -Matthias >> > > >> > > On 8/12/19 8:22 AM, Colin McCabe wrote: >> > > > Hi, >> > > > >> > > > If there’s no need to consume records in the Spark driver, then the >> > > Consumer is probably the wrong thing to use. Instead, Spark should use >> > > AdminClient to find out what partitions exist and where, manage their >> > > offsets, and so on. There are some KIPs under discussion now that >> would add >> > > the necessary APIs for managing offsets. >> > > > >> > > > Best, >> > > > Colin >> > > > >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: >> > > >> My feeling is that I didn't explain the use case for Spark >> properly and >> > > >> hence fail to explain the needs. Sorry about this. >> > > >> >> > > >> Spark leverages the single instance of KafkaConsumer in the driver >> > > which is >> > > >> registered solely on the consumer group. This is used in the plan >> phase >> > > for >> > > >> each micro-batch to calculate the overall topicpartitions with its >> > > offset >> > > >> ranges for this batch, and split and assign (topicpartition, >> fromOffset, >> > > >> untilOffset) to each input partition. After the planning is done >> and >> > > tasks >> > > >> are being distributed to executors, consumer per each input >> partition >> > > will >> > > >> be initialized from some executor (being assigned to the single >> > > >> topicpartition), and pull the actual records. (Pooling consumers is >> > > applied >> > > >> for sure.) As plan phase is to determine the overall >> topicpartitions and >> > > >> offset ranges to process, Spark is never interested on pulling the >> > > records >> > > >> in driver side. >> > > >> >> > > >> Spark mainly leverages poll(0) to get the latest assigned >> partitions and >> > > >> adopt the changes or validate the expectation. That's not only use >> case >> > > for >> > > >> poll(0). Spark is also seeking the offset per topicpartition to the >> > > >> earliest or the latest, or specific one (either provided by end >> user or >> > > the >> > > >> last committed offset) so that Spark can have actual offset or >> validate >> > > the >> > > >> provided offset. According to the javadoc (if I understand >> correctly), >> > > to >> > > >> get the offset immediately it seems to be required to call `poll` >> or >> > > >> `position`. >> > > >> >> > > >> The way Spark interacts with Kafka in this plan phase in driver is >> > > >> synchronous, as the phase should finish ASAP to run the next phase. >> > > >> Registering ConsumerRebalanceListener and tracking the change will >> > > require >> > > >> some asynchronous handling which sounds to add unnecessary >> complexity. >> > > >> Spark may be OK with deal with synchronous with timeout (that's >> what >> > > >> methods in KafkaConsumer have been providing - they're not >> > > asynchronous, at >> > > >> least for callers) but dealing with asynchronous is another level >> of >> > > >> interest. I can see the benefit where continuous thread runs and >> the >> > > >> consumer is busy with something continuously, relying on listener >> to >> > > hear >> > > >> the news on reassignment. Unfortunately that's not the case. >> > > >> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test code >> also >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many >> > > places >> > > >> as it's appropriate to the place which blocking (+timeout) call is >> > > >> preferred - so I can see the similar needs from here as well. >> > > >> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi < >> > > gabor.g.somo...@gmail.com> >> > > >> wrote: >> > > >> >> > > >>> Hi Guys, >> > > >>> >> > > >>> Please see the actual implementation, pretty sure it explains the >> > > situation >> > > >>> well: >> > > >>> >> > > >>> >> > > >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala >> > > >>> >> > > >>> To answer one question/assumption which popped up from all of you >> > > Spark not >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe + >> > > >>> KafkaConsumer#assign as well. >> > > >>> Please see here: >> > > >>> >> > > >>> >> > > >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala >> > > >>> >> > > >>> BR, >> > > >>> G >> > > >>> >> > > >>> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana < >> > > satish.dugg...@gmail.com> >> > > >>> wrote: >> > > >>> >> > > >>>> Hi Jungtaek, >> > > >>>> Thanks for the KIP. I have a couple of questions here. >> > > >>>> Is not Spark using Kafka's consumer group management across >> multiple >> > > >>>> consumers? >> > > >>>> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern, >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics >> for a >> > > >>>> pattern based subscription and Spark manually assigns those >> > > >>>> topic-partitions across consumers on workers? >> > > >>>> >> > > >>>> Thanks, >> > > >>>> Satish. >> > > >>>> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax < >> > > matth...@confluent.io> >> > > >>>> wrote: >> > > >>>> >> > > >>>>> If am not sure if I fully understand yet. >> > > >>>>> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as >> part of >> > > >>> its >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am >> missing >> > > >>>>> something here. >> > > >>>>> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group >> mechanism, >> > > >>>>> that takes care of the assignment of partitions to clients >> within the >> > > >>>>> group. Therefore, I am not sure what you mean by: >> > > >>>>> >> > > >>>>>> which Spark needs to >> > > >>>>>>> know to coordinate multiple consumers to pull correctly. >> > > >>>>> >> > > >>>>> Multiple thoughts that may help: >> > > >>>>> >> > > >>>>> - if Spark needs more control about the partition assignment, >> you can >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer >> > > >>>>> configuration) >> > > >>>>> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener` via >> > > >>>>> `subscribe()` to get informed when the group rebalances >> > > >>>>> >> > > >>>>> As you pointed out, using pattern subscription metadata can >> change if >> > > >>>>> topic are added/deleted. However, each metadata change will >> > > triggering >> > > >>> a >> > > >>>>> rebalance and thus you would get corresponding calls to you >> rebalance >> > > >>>>> listener to learn about it and react accordingly. >> > > >>>>> >> > > >>>>> Maybe you can explain why neither of both approaches works and >> what >> > > gap >> > > >>>>> the new API would close? >> > > >>>>> >> > > >>>>> >> > > >>>>> -Matthias >> > > >>>>> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote: >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say about >> Apache >> > > >>>>> Spark, >> > > >>>>>> but this will apply for everything which want to control >> offset of >> > > >>>> Kafka >> > > >>>>>> consumers. >> > > >>>>>> >> > > >>>>>> Spark is managing the committed offsets and the offsets which >> should >> > > >>> be >> > > >>>>>> polled now. Topics and partitions as well. This is required as >> Spark >> > > >>>>> itself >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a >> one of >> > > >>>>>> source/sink (though it's considered as very important). >> > > >>>>>> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which >> topics and >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but >> as >> > > >>> Spark >> > > >>>>> can >> > > >>>>>> also provide "patterns" of topics, as well as subscription can >> be >> > > >>>> changed >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which >> Spark >> > > >>> needs >> > > >>>>> to >> > > >>>>>> know to coordinate multiple consumers to pull correctly. >> > > >>>>>> >> > > >>>>>> Looks like assignment() doesn't update the assignment >> information in >> > > >>>>>> consumer. It just returns known one. There's only one known >> approach >> > > >>>>> doing >> > > >>>>>> this, calling `poll`, but Spark is not interested on returned >> > > >>> records, >> > > >>>> so >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the >> API. >> > > >>> This >> > > >>>>> KIP >> > > >>>>>> proposes to support this as official approach. >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim < >> kabh...@gmail.com> >> > > >>>> wrote: >> > > >>>>>> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as well. >> I'm in >> > > >>>>> favor >> > > >>>>>>> of describing it in this discussion thread so the discussion >> itself >> > > >>>> can >> > > >>>>> go >> > > >>>>>>> forward. So copying my answer here: >> > > >>>>>>> >> > > >>>>>>> We have some use case which we don't just rely on everything >> what >> > > >>>> Kafka >> > > >>>>>>> consumer provides. We want to know current assignment on this >> > > >>>> consumer, >> > > >>>>> and >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`. >> > > >>>>>>> >> > > >>>>>>> That said, we don't want to pull any records here, and if I'm >> not >> > > >>>>> missing >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if >> I'm >> > > >>>> missing >> > > >>>>>>> something. >> > > >>>>>>> >> > > >>>>>>> Thanks, >> > > >>>>>>> Jungtaek Lim (HeartSaVioR) >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < >> > > >>>> matth...@confluent.io> >> > > >>>>>>> wrote: >> > > >>>>>>> >> > > >>>>>>>> Thanks for the KIP. >> > > >>>>>>>> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for this >> > > >>> feature? >> > > >>>>>>>> Why would a consumer need to update it's metadata explicitly? >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> -Matthias >> > > >>>>>>>> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: >> > > >>>>>>>>> Hi devs, >> > > >>>>>>>>> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new >> > > >>> public >> > > >>>>>>>> method >> > > >>>>>>>>> to only update assignment metadata in consumer. >> > > >>>>>>>>> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it >> doesn't >> > > >>>>>>>> guarantee >> > > >>>>>>>>> that it doesn't pull any records, and new method >> `poll(Duration)` >> > > >>>>>>>> doesn't >> > > >>>>>>>>> have same semantic, so would like to propose new public API >> which >> > > >>>> only >> > > >>>>>>>> does >> > > >>>>>>>>> the desired behavior. >> > > >>>>>>>>> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw >> > > >>>>>>>>> >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal, >> as I'm >> > > >>> new >> > > >>>>> to >> > > >>>>>>>>> Kafka community and may not catch preferences (like >> > > >>> TimeoutException >> > > >>>>> vs >> > > >>>>>>>>> boolean, etc.) on Kafka project. >> > > >>>>>>>>> >> > > >>>>>>>>> Thanks in advance! >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR) >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>> >> > > >>>>>>> -- >> > > >>>>>>> Name : Jungtaek Lim >> > > >>>>>>> Blog : http://medium.com/@heartsavior >> > > >>>>>>> Twitter : http://twitter.com/heartsavior >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior >> > > >>>>>>> >> > > >>>>>> >> > > >>>>>> >> > > >>>>> >> > > >>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> >> > > >> -- >> > > >> Name : Jungtaek Lim >> > > >> Blog : http://medium.com/@heartsavior >> > > >> Twitter : http://twitter.com/heartsavior >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior >> > > >> >> > > > >> > > >> > > >> > >> > -- >> > Name : Jungtaek Lim >> > Blog : http://medium.com/@heartsavior >> > Twitter : http://twitter.com/heartsavior >> > LinkedIn : http://www.linkedin.com/in/heartsavior >> > >> > > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior > -- Name : Jungtaek Lim Blog : http://medium.com/@heartsavior Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior