Thanks for the details Jungtaek! I tend to agree with Colin, that using the AdminClient seems to be the better choice.
You can get all topics via `listTopics()` (and you can refresh this information on regular intervals) and match any pattern against the list of available topics in the driver. As you use `assignment()` and store offsets in the Spark checkpoint, it seems that using consumer group management is not a good fit for the use case. Thoughts? -Matthias On 8/12/19 8:22 AM, Colin McCabe wrote: > Hi, > > If there’s no need to consume records in the Spark driver, then the Consumer > is probably the wrong thing to use. Instead, Spark should use AdminClient to > find out what partitions exist and where, manage their offsets, and so on. > There are some KIPs under discussion now that would add the necessary APIs > for managing offsets. > > Best, > Colin > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: >> My feeling is that I didn't explain the use case for Spark properly and >> hence fail to explain the needs. Sorry about this. >> >> Spark leverages the single instance of KafkaConsumer in the driver which is >> registered solely on the consumer group. This is used in the plan phase for >> each micro-batch to calculate the overall topicpartitions with its offset >> ranges for this batch, and split and assign (topicpartition, fromOffset, >> untilOffset) to each input partition. After the planning is done and tasks >> are being distributed to executors, consumer per each input partition will >> be initialized from some executor (being assigned to the single >> topicpartition), and pull the actual records. (Pooling consumers is applied >> for sure.) As plan phase is to determine the overall topicpartitions and >> offset ranges to process, Spark is never interested on pulling the records >> in driver side. >> >> Spark mainly leverages poll(0) to get the latest assigned partitions and >> adopt the changes or validate the expectation. That's not only use case for >> poll(0). Spark is also seeking the offset per topicpartition to the >> earliest or the latest, or specific one (either provided by end user or the >> last committed offset) so that Spark can have actual offset or validate the >> provided offset. According to the javadoc (if I understand correctly), to >> get the offset immediately it seems to be required to call `poll` or >> `position`. >> >> The way Spark interacts with Kafka in this plan phase in driver is >> synchronous, as the phase should finish ASAP to run the next phase. >> Registering ConsumerRebalanceListener and tracking the change will require >> some asynchronous handling which sounds to add unnecessary complexity. >> Spark may be OK with deal with synchronous with timeout (that's what >> methods in KafkaConsumer have been providing - they're not asynchronous, at >> least for callers) but dealing with asynchronous is another level of >> interest. I can see the benefit where continuous thread runs and the >> consumer is busy with something continuously, relying on listener to hear >> the news on reassignment. Unfortunately that's not the case. >> >> Unit tests in Spark have similar needs: looks like Kafka test code also >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places >> as it's appropriate to the place which blocking (+timeout) call is >> preferred - so I can see the similar needs from here as well. >> >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Guys, >>> >>> Please see the actual implementation, pretty sure it explains the situation >>> well: >>> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala >>> >>> To answer one question/assumption which popped up from all of you Spark not >>> only uses KafkaConsumer#subscribe but pattern subscribe + >>> KafkaConsumer#assign as well. >>> Please see here: >>> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala >>> >>> BR, >>> G >>> >>> >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <satish.dugg...@gmail.com> >>> wrote: >>> >>>> Hi Jungtaek, >>>> Thanks for the KIP. I have a couple of questions here. >>>> Is not Spark using Kafka's consumer group management across multiple >>>> consumers? >>>> >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern, >>>> ConsumerRebalanceListener listener) only to get all the topics for a >>>> pattern based subscription and Spark manually assigns those >>>> topic-partitions across consumers on workers? >>>> >>>> Thanks, >>>> Satish. >>>> >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> If am not sure if I fully understand yet. >>>>> >>>>> The fact, that Spark does not stores offsets in Kafka but as part of >>> its >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing >>>>> something here. >>>>> >>>>> As you are using subscribe(), you use Kafka consumer group mechanism, >>>>> that takes care of the assignment of partitions to clients within the >>>>> group. Therefore, I am not sure what you mean by: >>>>> >>>>>> which Spark needs to >>>>>>> know to coordinate multiple consumers to pull correctly. >>>>> >>>>> Multiple thoughts that may help: >>>>> >>>>> - if Spark needs more control about the partition assignment, you can >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer >>>>> configuration) >>>>> >>>>> - you may also want to register `ConsumerRebalanceListener` via >>>>> `subscribe()` to get informed when the group rebalances >>>>> >>>>> As you pointed out, using pattern subscription metadata can change if >>>>> topic are added/deleted. However, each metadata change will triggering >>> a >>>>> rebalance and thus you would get corresponding calls to you rebalance >>>>> listener to learn about it and react accordingly. >>>>> >>>>> Maybe you can explain why neither of both approaches works and what gap >>>>> the new API would close? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote: >>>>>> Let me elaborate my explanation a bit more. Here we say about Apache >>>>> Spark, >>>>>> but this will apply for everything which want to control offset of >>>> Kafka >>>>>> consumers. >>>>>> >>>>>> Spark is managing the committed offsets and the offsets which should >>> be >>>>>> polled now. Topics and partitions as well. This is required as Spark >>>>> itself >>>>>> has its own general checkpoint mechanism and Kafka is just a one of >>>>>> source/sink (though it's considered as very important). >>>>>> >>>>>> To pull records from Kafka, Spark provides to Kafka which topics and >>>>>> partitions it wants to subscribe(, and do seek and poll), but as >>> Spark >>>>> can >>>>>> also provide "patterns" of topics, as well as subscription can be >>>> changed >>>>>> in Kafka side (topic added/dropped, partitions added) which Spark >>> needs >>>>> to >>>>>> know to coordinate multiple consumers to pull correctly. >>>>>> >>>>>> Looks like assignment() doesn't update the assignment information in >>>>>> consumer. It just returns known one. There's only one known approach >>>>> doing >>>>>> this, calling `poll`, but Spark is not interested on returned >>> records, >>>> so >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API. >>> This >>>>> KIP >>>>>> proposes to support this as official approach. >>>>>> >>>>>> >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com> >>>> wrote: >>>>>> >>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm in >>>>> favor >>>>>>> of describing it in this discussion thread so the discussion itself >>>> can >>>>> go >>>>>>> forward. So copying my answer here: >>>>>>> >>>>>>> We have some use case which we don't just rely on everything what >>>> Kafka >>>>>>> consumer provides. We want to know current assignment on this >>>> consumer, >>>>> and >>>>>>> to get the latest assignment, we called the hack `poll(0)`. >>>>>>> >>>>>>> That said, we don't want to pull any records here, and if I'm not >>>>> missing >>>>>>> here, there's no way to accomplish this. Please guide me if I'm >>>> missing >>>>>>> something. >>>>>>> >>>>>>> Thanks, >>>>>>> Jungtaek Lim (HeartSaVioR) >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the KIP. >>>>>>>> >>>>>>>> Can you elaborate a little bit more on the use case for this >>> feature? >>>>>>>> Why would a consumer need to update it's metadata explicitly? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: >>>>>>>>> Hi devs, >>>>>>>>> >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new >>> public >>>>>>>> method >>>>>>>>> to only update assignment metadata in consumer. >>>>>>>>> >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't >>>>>>>> guarantee >>>>>>>>> that it doesn't pull any records, and new method `poll(Duration)` >>>>>>>> doesn't >>>>>>>>> have same semantic, so would like to propose new public API which >>>> only >>>>>>>> does >>>>>>>>> the desired behavior. >>>>>>>>> >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw >>>>>>>>> >>>>>>>>> Please feel free to suggest any improvements on proposal, as I'm >>> new >>>>> to >>>>>>>>> Kafka community and may not catch preferences (like >>> TimeoutException >>>>> vs >>>>>>>>> boolean, etc.) on Kafka project. >>>>>>>>> >>>>>>>>> Thanks in advance! >>>>>>>>> Jungtaek Lim (HeartSaVioR) >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Name : Jungtaek Lim >>>>>>> Blog : http://medium.com/@heartsavior >>>>>>> Twitter : http://twitter.com/heartsavior >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >> >> -- >> Name : Jungtaek Lim >> Blog : http://medium.com/@heartsavior >> Twitter : http://twitter.com/heartsavior >> LinkedIn : http://www.linkedin.com/in/heartsavior >> >
signature.asc
Description: OpenPGP digital signature