Hi Jungtaek,
                   Have you looked into this interface
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
.
Right now its not a public interface but does the methods available in this
interface work for your needs? . The DefaultMeatadataUpdater responsible
for making the metadata requests to brokers
https://github.com/apache/kafka/blob/26814e060e98f9674127be13a28ce41a21ca6b3c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L958
and
if it can be invoked from client methods does that solve your requirements?

Thanks,
Harsha


On Mon, Aug 12, 2019 at 2:53 PM, Jungtaek Lim <kabh...@gmail.com> wrote:

> Thanks for the feedbacks Colin and Matthias.
>
> I agree with you regarding getting topics and partitions via AdminClient,
> just curious how much the overhead would be. Would it be lighter, or
> heavier? We may not want to list topics in regular intervals - in plan
> phase we want to know up-to-date information so that the calculation from
> Spark itself makes sense.
>
> On the other hands I'm not seeing any information regarding offset in
> current AdminClient, which is also one of reason we leverage consumer and
> call poll(0). Colin, as you mentioned there're KIPs addressing this, could
> you refer KIPs so that we can see whether it would work for our case?
> Without support of this we cannot replace our usage of consumer/poll with
> AdminClient.
>
> ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> receives regex same as consumer subscription via pattern. We would like to
> provide same behavior what Kafka is basically providing as a source.
>
> On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> Thanks for the details Jungtaek!
>
> I tend to agree with Colin, that using the AdminClient seems to be the
> better choice.
>
> You can get all topics via `listTopics()` (and you can refresh this
> information on regular intervals) and match any pattern against the list of
> available topics in the driver.
>
> As you use `assignment()` and store offsets in the Spark checkpoint, it
> seems that using consumer group management is not a good fit for the use
> case.
>
> Thoughts?
>
> -Matthias
>
> On 8/12/19 8:22 AM, Colin McCabe wrote:
>
> Hi,
>
> If there’s no need to consume records in the Spark driver, then the
>
> Consumer is probably the wrong thing to use. Instead, Spark should use
> AdminClient to find out what partitions exist and where, manage their
> offsets, and so on. There are some KIPs under discussion now that would add
> the necessary APIs for managing offsets.
>
> Best,
> Colin
>
> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>
> My feeling is that I didn't explain the use case for Spark properly and
> hence fail to explain the needs. Sorry about this.
>
> Spark leverages the single instance of KafkaConsumer in the driver
>
> which is
>
> registered solely on the consumer group. This is used in the plan phase
>
> for
>
> each micro-batch to calculate the overall topicpartitions with its
>
> offset
>
> ranges for this batch, and split and assign (topicpartition, fromOffset,
> untilOffset) to each input partition. After the planning is done and
>
> tasks
>
> are being distributed to executors, consumer per each input partition
>
> will
>
> be initialized from some executor (being assigned to the single
> topicpartition), and pull the actual records. (Pooling consumers is
>
> applied
>
> for sure.) As plan phase is to determine the overall topicpartitions and
> offset ranges to process, Spark is never interested on pulling the
>
> records
>
> in driver side.
>
> Spark mainly leverages poll(0) to get the latest assigned partitions and
> adopt the changes or validate the expectation. That's not only use case
>
> for
>
> poll(0). Spark is also seeking the offset per topicpartition to the
> earliest or the latest, or specific one (either provided by end user or
>
> the
>
> last committed offset) so that Spark can have actual offset or validate
>
> the
>
> provided offset. According to the javadoc (if I understand correctly),
>
> to
>
> get the offset immediately it seems to be required to call `poll` or
> `position`.
>
> The way Spark interacts with Kafka in this plan phase in driver is
> synchronous, as the phase should finish ASAP to run the next phase.
> Registering ConsumerRebalanceListener and tracking the change will
>
> require
>
> some asynchronous handling which sounds to add unnecessary complexity.
> Spark may be OK with deal with synchronous with timeout (that's what
> methods in KafkaConsumer have been providing - they're not
>
> asynchronous, at
>
> least for callers) but dealing with asynchronous is another level of
> interest. I can see the benefit where continuous thread runs and the
> consumer is busy with something continuously, relying on listener to
>
> hear
>
> the news on reassignment. Unfortunately that's not the case.
>
> Unit tests in Spark have similar needs: looks like Kafka test code also
> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
>
> places
>
> as it's appropriate to the place which blocking (+timeout) call is
> preferred - so I can see the similar needs from here as well.
>
> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
>
> gabor.g.somo...@gmail.com>
>
> wrote:
>
> Hi Guys,
>
> Please see the actual implementation, pretty sure it explains the
>
> situation
>
> well:
>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/
> main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>
> To answer one question/assumption which popped up from all of you
>
> Spark not
>
> only uses KafkaConsumer#subscribe but pattern subscribe +
> KafkaConsumer#assign as well.
> Please see here:
>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/
> main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>
> BR,
> G
>
> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
>
> satish.dugg...@gmail.com>
>
> wrote:
>
> Hi Jungtaek,
> Thanks for the KIP. I have a couple of questions here. Is not Spark using
> Kafka's consumer group management across multiple consumers?
>
> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> ConsumerRebalanceListener listener) only to get all the topics for a
> pattern based subscription and Spark manually assigns those
> topic-partitions across consumers on workers?
>
> Thanks,
> Satish.
>
> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
>
> matth...@confluent.io>
>
> wrote:
>
> If am not sure if I fully understand yet.
>
> The fact, that Spark does not stores offsets in Kafka but as part of
>
> its
>
> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> something here.
>
> As you are using subscribe(), you use Kafka consumer group mechanism, that
> takes care of the assignment of partitions to clients within the group.
> Therefore, I am not sure what you mean by:
>
> which Spark needs to
>
> know to coordinate multiple consumers to pull correctly.
>
> Multiple thoughts that may help:
>
> - if Spark needs more control about the partition assignment, you can
> provide a custom `ConsumerPartitionAssignor` (via the consumer
> configuration)
>
> - you may also want to register `ConsumerRebalanceListener` via
> `subscribe()` to get informed when the group rebalances
>
> As you pointed out, using pattern subscription metadata can change if
> topic are added/deleted. However, each metadata change will
>
> triggering
>
> a
>
> rebalance and thus you would get corresponding calls to you rebalance
> listener to learn about it and react accordingly.
>
> Maybe you can explain why neither of both approaches works and what
>
> gap
>
> the new API would close?
>
> -Matthias
>
> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>
> Let me elaborate my explanation a bit more. Here we say about Apache
>
> Spark,
>
> but this will apply for everything which want to control offset of
>
> Kafka
>
> consumers.
>
> Spark is managing the committed offsets and the offsets which should
>
> be
>
> polled now. Topics and partitions as well. This is required as Spark
>
> itself
>
> has its own general checkpoint mechanism and Kafka is just a one of
> source/sink (though it's considered as very important).
>
> To pull records from Kafka, Spark provides to Kafka which topics and
> partitions it wants to subscribe(, and do seek and poll), but as
>
> Spark
>
> can
>
> also provide "patterns" of topics, as well as subscription can be
>
> changed
>
> in Kafka side (topic added/dropped, partitions added) which Spark
>
> needs
>
> to
>
> know to coordinate multiple consumers to pull correctly.
>
> Looks like assignment() doesn't update the assignment information in
> consumer. It just returns known one. There's only one known approach
>
> doing
>
> this, calling `poll`, but Spark is not interested on returned
>
> records,
>
> so
>
> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
>
> This
>
> KIP
>
> proposes to support this as official approach.
>
> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com>
>
> wrote:
>
> Sorry I didn't recognize you're also asking it here as well. I'm in
>
> favor
>
> of describing it in this discussion thread so the discussion itself
>
> can
>
> go
>
> forward. So copying my answer here:
>
> We have some use case which we don't just rely on everything what
>
> Kafka
>
> consumer provides. We want to know current assignment on this
>
> consumer,
>
> and
>
> to get the latest assignment, we called the hack `poll(0)`.
>
> That said, we don't want to pull any records here, and if I'm not
>
> missing
>
> here, there's no way to accomplish this. Please guide me if I'm
>
> missing
>
> something.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>
> matth...@confluent.io>
>
> wrote:
>
> Thanks for the KIP.
>
> Can you elaborate a little bit more on the use case for this
>
> feature?
>
> Why would a consumer need to update it's metadata explicitly?
>
> -Matthias
>
> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>
> Hi devs,
>
> I'd like to initiate discussion around KIP-505, exposing new
>
> public
>
> method
>
> to only update assignment metadata in consumer.
>
> `poll(0)` has been misused as according to Kafka doc it doesn't
>
> guarantee
>
> that it doesn't pull any records, and new method `poll(Duration)`
>
> doesn't
>
> have same semantic, so would like to propose new public API which
>
> only
>
> does
>
> the desired behavior.
>
> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>
> Please feel free to suggest any improvements on proposal, as I'm
>
> new
>
> to
>
> Kafka community and may not catch preferences (like
>
> TimeoutException
>
> vs
>
> boolean, etc.) on Kafka project.
>
> Thanks in advance!
> Jungtaek Lim (HeartSaVioR)
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>

Reply via email to