Hi Guys, Please see the actual implementation, pretty sure it explains the situation well: https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
To answer one question/assumption which popped up from all of you Spark not only uses KafkaConsumer#subscribe but pattern subscribe + KafkaConsumer#assign as well. Please see here: https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala BR, G On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <satish.dugg...@gmail.com> wrote: > Hi Jungtaek, > Thanks for the KIP. I have a couple of questions here. > Is not Spark using Kafka's consumer group management across multiple > consumers? > > Is Spark using KafkaConsumer#subscribe(Pattern pattern, > ConsumerRebalanceListener listener) only to get all the topics for a > pattern based subscription and Spark manually assigns those > topic-partitions across consumers on workers? > > Thanks, > Satish. > > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <matth...@confluent.io> > wrote: > > > If am not sure if I fully understand yet. > > > > The fact, that Spark does not stores offsets in Kafka but as part of its > > own checkpoint mechanism seems to be orthogonal. Maybe I am missing > > something here. > > > > As you are using subscribe(), you use Kafka consumer group mechanism, > > that takes care of the assignment of partitions to clients within the > > group. Therefore, I am not sure what you mean by: > > > > > which Spark needs to > > >> know to coordinate multiple consumers to pull correctly. > > > > Multiple thoughts that may help: > > > > - if Spark needs more control about the partition assignment, you can > > provide a custom `ConsumerPartitionAssignor` (via the consumer > > configuration) > > > > - you may also want to register `ConsumerRebalanceListener` via > > `subscribe()` to get informed when the group rebalances > > > > As you pointed out, using pattern subscription metadata can change if > > topic are added/deleted. However, each metadata change will triggering a > > rebalance and thus you would get corresponding calls to you rebalance > > listener to learn about it and react accordingly. > > > > Maybe you can explain why neither of both approaches works and what gap > > the new API would close? > > > > > > -Matthias > > > > On 8/11/19 5:11 AM, Jungtaek Lim wrote: > > > Let me elaborate my explanation a bit more. Here we say about Apache > > Spark, > > > but this will apply for everything which want to control offset of > Kafka > > > consumers. > > > > > > Spark is managing the committed offsets and the offsets which should be > > > polled now. Topics and partitions as well. This is required as Spark > > itself > > > has its own general checkpoint mechanism and Kafka is just a one of > > > source/sink (though it's considered as very important). > > > > > > To pull records from Kafka, Spark provides to Kafka which topics and > > > partitions it wants to subscribe(, and do seek and poll), but as Spark > > can > > > also provide "patterns" of topics, as well as subscription can be > changed > > > in Kafka side (topic added/dropped, partitions added) which Spark needs > > to > > > know to coordinate multiple consumers to pull correctly. > > > > > > Looks like assignment() doesn't update the assignment information in > > > consumer. It just returns known one. There's only one known approach > > doing > > > this, calling `poll`, but Spark is not interested on returned records, > so > > > there's a need for a hack `poll(0)`, and Kafka deprecated the API. This > > KIP > > > proposes to support this as official approach. > > > > > > > > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com> > wrote: > > > > > >> Sorry I didn't recognize you're also asking it here as well. I'm in > > favor > > >> of describing it in this discussion thread so the discussion itself > can > > go > > >> forward. So copying my answer here: > > >> > > >> We have some use case which we don't just rely on everything what > Kafka > > >> consumer provides. We want to know current assignment on this > consumer, > > and > > >> to get the latest assignment, we called the hack `poll(0)`. > > >> > > >> That said, we don't want to pull any records here, and if I'm not > > missing > > >> here, there's no way to accomplish this. Please guide me if I'm > missing > > >> something. > > >> > > >> Thanks, > > >> Jungtaek Lim (HeartSaVioR) > > >> > > >> > > >> > > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < > matth...@confluent.io> > > >> wrote: > > >> > > >>> Thanks for the KIP. > > >>> > > >>> Can you elaborate a little bit more on the use case for this feature? > > >>> Why would a consumer need to update it's metadata explicitly? > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: > > >>>> Hi devs, > > >>>> > > >>>> I'd like to initiate discussion around KIP-505, exposing new public > > >>> method > > >>>> to only update assignment metadata in consumer. > > >>>> > > >>>> `poll(0)` has been misused as according to Kafka doc it doesn't > > >>> guarantee > > >>>> that it doesn't pull any records, and new method `poll(Duration)` > > >>> doesn't > > >>>> have same semantic, so would like to propose new public API which > only > > >>> does > > >>>> the desired behavior. > > >>>> > > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw > > >>>> > > >>>> Please feel free to suggest any improvements on proposal, as I'm new > > to > > >>>> Kafka community and may not catch preferences (like TimeoutException > > vs > > >>>> boolean, etc.) on Kafka project. > > >>>> > > >>>> Thanks in advance! > > >>>> Jungtaek Lim (HeartSaVioR) > > >>>> > > >>> > > >>> > > >> > > >> -- > > >> Name : Jungtaek Lim > > >> Blog : http://medium.com/@heartsavior > > >> Twitter : http://twitter.com/heartsavior > > >> LinkedIn : http://www.linkedin.com/in/heartsavior > > >> > > > > > > > > > > >