Hi Jungtaek, Have you looked into this interface https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java . Right now its not a public interface but does the methods available in this interface work for your needs? . The DefaultMeatadataUpdater responsible for making the metadata requests to brokers https://github.com/apache/kafka/blob/26814e060e98f9674127be13a28ce41a21ca6b3c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L958 and if it can be invoked from client methods does that solve your requirements?
Thanks, Harsha On Mon, Aug 12, 2019 at 2:53 PM, Jungtaek Lim <kabh...@gmail.com> wrote: > Thanks for the feedbacks Colin and Matthias. > > I agree with you regarding getting topics and partitions via AdminClient, > just curious how much the overhead would be. Would it be lighter, or > heavier? We may not want to list topics in regular intervals - in plan > phase we want to know up-to-date information so that the calculation from > Spark itself makes sense. > > On the other hands I'm not seeing any information regarding offset in > current AdminClient, which is also one of reason we leverage consumer and > call poll(0). Colin, as you mentioned there're KIPs addressing this, could > you refer KIPs so that we can see whether it would work for our case? > Without support of this we cannot replace our usage of consumer/poll with > AdminClient. > > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which > receives regex same as consumer subscription via pattern. We would like to > provide same behavior what Kafka is basically providing as a source. > > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io> > wrote: > > Thanks for the details Jungtaek! > > I tend to agree with Colin, that using the AdminClient seems to be the > better choice. > > You can get all topics via `listTopics()` (and you can refresh this > information on regular intervals) and match any pattern against the list of > available topics in the driver. > > As you use `assignment()` and store offsets in the Spark checkpoint, it > seems that using consumer group management is not a good fit for the use > case. > > Thoughts? > > -Matthias > > On 8/12/19 8:22 AM, Colin McCabe wrote: > > Hi, > > If there’s no need to consume records in the Spark driver, then the > > Consumer is probably the wrong thing to use. Instead, Spark should use > AdminClient to find out what partitions exist and where, manage their > offsets, and so on. There are some KIPs under discussion now that would add > the necessary APIs for managing offsets. > > Best, > Colin > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: > > My feeling is that I didn't explain the use case for Spark properly and > hence fail to explain the needs. Sorry about this. > > Spark leverages the single instance of KafkaConsumer in the driver > > which is > > registered solely on the consumer group. This is used in the plan phase > > for > > each micro-batch to calculate the overall topicpartitions with its > > offset > > ranges for this batch, and split and assign (topicpartition, fromOffset, > untilOffset) to each input partition. After the planning is done and > > tasks > > are being distributed to executors, consumer per each input partition > > will > > be initialized from some executor (being assigned to the single > topicpartition), and pull the actual records. (Pooling consumers is > > applied > > for sure.) As plan phase is to determine the overall topicpartitions and > offset ranges to process, Spark is never interested on pulling the > > records > > in driver side. > > Spark mainly leverages poll(0) to get the latest assigned partitions and > adopt the changes or validate the expectation. That's not only use case > > for > > poll(0). Spark is also seeking the offset per topicpartition to the > earliest or the latest, or specific one (either provided by end user or > > the > > last committed offset) so that Spark can have actual offset or validate > > the > > provided offset. According to the javadoc (if I understand correctly), > > to > > get the offset immediately it seems to be required to call `poll` or > `position`. > > The way Spark interacts with Kafka in this plan phase in driver is > synchronous, as the phase should finish ASAP to run the next phase. > Registering ConsumerRebalanceListener and tracking the change will > > require > > some asynchronous handling which sounds to add unnecessary complexity. > Spark may be OK with deal with synchronous with timeout (that's what > methods in KafkaConsumer have been providing - they're not > > asynchronous, at > > least for callers) but dealing with asynchronous is another level of > interest. I can see the benefit where continuous thread runs and the > consumer is busy with something continuously, relying on listener to > > hear > > the news on reassignment. Unfortunately that's not the case. > > Unit tests in Spark have similar needs: looks like Kafka test code also > leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many > > places > > as it's appropriate to the place which blocking (+timeout) call is > preferred - so I can see the similar needs from here as well. > > On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi < > > gabor.g.somo...@gmail.com> > > wrote: > > Hi Guys, > > Please see the actual implementation, pretty sure it explains the > > situation > > well: > > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/ > main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala > > To answer one question/assumption which popped up from all of you > > Spark not > > only uses KafkaConsumer#subscribe but pattern subscribe + > KafkaConsumer#assign as well. > Please see here: > > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/ > main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala > > BR, > G > > On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana < > > satish.dugg...@gmail.com> > > wrote: > > Hi Jungtaek, > Thanks for the KIP. I have a couple of questions here. Is not Spark using > Kafka's consumer group management across multiple consumers? > > Is Spark using KafkaConsumer#subscribe(Pattern pattern, > ConsumerRebalanceListener listener) only to get all the topics for a > pattern based subscription and Spark manually assigns those > topic-partitions across consumers on workers? > > Thanks, > Satish. > > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax < > > matth...@confluent.io> > > wrote: > > If am not sure if I fully understand yet. > > The fact, that Spark does not stores offsets in Kafka but as part of > > its > > own checkpoint mechanism seems to be orthogonal. Maybe I am missing > something here. > > As you are using subscribe(), you use Kafka consumer group mechanism, that > takes care of the assignment of partitions to clients within the group. > Therefore, I am not sure what you mean by: > > which Spark needs to > > know to coordinate multiple consumers to pull correctly. > > Multiple thoughts that may help: > > - if Spark needs more control about the partition assignment, you can > provide a custom `ConsumerPartitionAssignor` (via the consumer > configuration) > > - you may also want to register `ConsumerRebalanceListener` via > `subscribe()` to get informed when the group rebalances > > As you pointed out, using pattern subscription metadata can change if > topic are added/deleted. However, each metadata change will > > triggering > > a > > rebalance and thus you would get corresponding calls to you rebalance > listener to learn about it and react accordingly. > > Maybe you can explain why neither of both approaches works and what > > gap > > the new API would close? > > -Matthias > > On 8/11/19 5:11 AM, Jungtaek Lim wrote: > > Let me elaborate my explanation a bit more. Here we say about Apache > > Spark, > > but this will apply for everything which want to control offset of > > Kafka > > consumers. > > Spark is managing the committed offsets and the offsets which should > > be > > polled now. Topics and partitions as well. This is required as Spark > > itself > > has its own general checkpoint mechanism and Kafka is just a one of > source/sink (though it's considered as very important). > > To pull records from Kafka, Spark provides to Kafka which topics and > partitions it wants to subscribe(, and do seek and poll), but as > > Spark > > can > > also provide "patterns" of topics, as well as subscription can be > > changed > > in Kafka side (topic added/dropped, partitions added) which Spark > > needs > > to > > know to coordinate multiple consumers to pull correctly. > > Looks like assignment() doesn't update the assignment information in > consumer. It just returns known one. There's only one known approach > > doing > > this, calling `poll`, but Spark is not interested on returned > > records, > > so > > there's a need for a hack `poll(0)`, and Kafka deprecated the API. > > This > > KIP > > proposes to support this as official approach. > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com> > > wrote: > > Sorry I didn't recognize you're also asking it here as well. I'm in > > favor > > of describing it in this discussion thread so the discussion itself > > can > > go > > forward. So copying my answer here: > > We have some use case which we don't just rely on everything what > > Kafka > > consumer provides. We want to know current assignment on this > > consumer, > > and > > to get the latest assignment, we called the hack `poll(0)`. > > That said, we don't want to pull any records here, and if I'm not > > missing > > here, there's no way to accomplish this. Please guide me if I'm > > missing > > something. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < > > matth...@confluent.io> > > wrote: > > Thanks for the KIP. > > Can you elaborate a little bit more on the use case for this > > feature? > > Why would a consumer need to update it's metadata explicitly? > > -Matthias > > On 8/8/19 8:46 PM, Jungtaek Lim wrote: > > Hi devs, > > I'd like to initiate discussion around KIP-505, exposing new > > public > > method > > to only update assignment metadata in consumer. > > `poll(0)` has been misused as according to Kafka doc it doesn't > > guarantee > > that it doesn't pull any records, and new method `poll(Duration)` > > doesn't > > have same semantic, so would like to propose new public API which > > only > > does > > the desired behavior. > > KIP page: https://cwiki.apache.org/confluence/x/z5NiBw > > Please feel free to suggest any improvements on proposal, as I'm > > new > > to > > Kafka community and may not catch preferences (like > > TimeoutException > > vs > > boolean, etc.) on Kafka project. > > Thanks in advance! > Jungtaek Lim (HeartSaVioR) > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior >