Hi Jungtaek, Thanks for the KIP. I have a couple of questions here. Is not Spark using Kafka's consumer group management across multiple consumers?
Is Spark using KafkaConsumer#subscribe(Pattern pattern, ConsumerRebalanceListener listener) only to get all the topics for a pattern based subscription and Spark manually assigns those topic-partitions across consumers on workers? Thanks, Satish. On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <matth...@confluent.io> wrote: > If am not sure if I fully understand yet. > > The fact, that Spark does not stores offsets in Kafka but as part of its > own checkpoint mechanism seems to be orthogonal. Maybe I am missing > something here. > > As you are using subscribe(), you use Kafka consumer group mechanism, > that takes care of the assignment of partitions to clients within the > group. Therefore, I am not sure what you mean by: > > > which Spark needs to > >> know to coordinate multiple consumers to pull correctly. > > Multiple thoughts that may help: > > - if Spark needs more control about the partition assignment, you can > provide a custom `ConsumerPartitionAssignor` (via the consumer > configuration) > > - you may also want to register `ConsumerRebalanceListener` via > `subscribe()` to get informed when the group rebalances > > As you pointed out, using pattern subscription metadata can change if > topic are added/deleted. However, each metadata change will triggering a > rebalance and thus you would get corresponding calls to you rebalance > listener to learn about it and react accordingly. > > Maybe you can explain why neither of both approaches works and what gap > the new API would close? > > > -Matthias > > On 8/11/19 5:11 AM, Jungtaek Lim wrote: > > Let me elaborate my explanation a bit more. Here we say about Apache > Spark, > > but this will apply for everything which want to control offset of Kafka > > consumers. > > > > Spark is managing the committed offsets and the offsets which should be > > polled now. Topics and partitions as well. This is required as Spark > itself > > has its own general checkpoint mechanism and Kafka is just a one of > > source/sink (though it's considered as very important). > > > > To pull records from Kafka, Spark provides to Kafka which topics and > > partitions it wants to subscribe(, and do seek and poll), but as Spark > can > > also provide "patterns" of topics, as well as subscription can be changed > > in Kafka side (topic added/dropped, partitions added) which Spark needs > to > > know to coordinate multiple consumers to pull correctly. > > > > Looks like assignment() doesn't update the assignment information in > > consumer. It just returns known one. There's only one known approach > doing > > this, calling `poll`, but Spark is not interested on returned records, so > > there's a need for a hack `poll(0)`, and Kafka deprecated the API. This > KIP > > proposes to support this as official approach. > > > > > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com> wrote: > > > >> Sorry I didn't recognize you're also asking it here as well. I'm in > favor > >> of describing it in this discussion thread so the discussion itself can > go > >> forward. So copying my answer here: > >> > >> We have some use case which we don't just rely on everything what Kafka > >> consumer provides. We want to know current assignment on this consumer, > and > >> to get the latest assignment, we called the hack `poll(0)`. > >> > >> That said, we don't want to pull any records here, and if I'm not > missing > >> here, there's no way to accomplish this. Please guide me if I'm missing > >> something. > >> > >> Thanks, > >> Jungtaek Lim (HeartSaVioR) > >> > >> > >> > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Thanks for the KIP. > >>> > >>> Can you elaborate a little bit more on the use case for this feature? > >>> Why would a consumer need to update it's metadata explicitly? > >>> > >>> > >>> -Matthias > >>> > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: > >>>> Hi devs, > >>>> > >>>> I'd like to initiate discussion around KIP-505, exposing new public > >>> method > >>>> to only update assignment metadata in consumer. > >>>> > >>>> `poll(0)` has been misused as according to Kafka doc it doesn't > >>> guarantee > >>>> that it doesn't pull any records, and new method `poll(Duration)` > >>> doesn't > >>>> have same semantic, so would like to propose new public API which only > >>> does > >>>> the desired behavior. > >>>> > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw > >>>> > >>>> Please feel free to suggest any improvements on proposal, as I'm new > to > >>>> Kafka community and may not catch preferences (like TimeoutException > vs > >>>> boolean, etc.) on Kafka project. > >>>> > >>>> Thanks in advance! > >>>> Jungtaek Lim (HeartSaVioR) > >>>> > >>> > >>> > >> > >> -- > >> Name : Jungtaek Lim > >> Blog : http://medium.com/@heartsavior > >> Twitter : http://twitter.com/heartsavior > >> LinkedIn : http://www.linkedin.com/in/heartsavior > >> > > > > > >