Inspired by the `TestUtils` that Luke has referred to, I wrote a similar utility function and put together a minimal example [1] that IMHO exposes that the poll method behaves rather indeterministically.... or at least I cannot explain the observed behavior.
In the example, I will always wait a total of 200ms between two subsequent polls (eg. 150ms during poll, 50ms in the loop). I get the correct result, when I use a poll timeout of at least 100ms. However, when I poll with Duration.ZERO (and therefore sleep in the loop for 200ms), I get zero results. Shouldn't it be completely irrelevant where I wait? I also cannot get the solution suggested by Paul to work, as requesting endOffsets requires a Collection<TopicPartition>, which I try to acquire through the assignment of my consumer. Unfortunately, con.assignment() is not directly available and only triggered on the first poll... The whole interaction with the consumer would get sooo much easier if the API would give us either a) a blocking "wait for subscription/assignment/rebalancing/<whatever else is to be done>" method or b) not just a listener for when these tasks are completed, but also a notification when they are started/need to be performed, so we can do the blocking ourselves until we get the completion signal. best Sebastian [1] https://pastebin.com/kmE0FJMy On Tue, Jan 25, 2022 at 8:27 AM Luke Chen <show...@gmail.com> wrote: > > Yes, there is no such "synchronous subscribe" method in Kafka. > Consumer group will start to work after `poll` is called. > > Inside Kafka, there are util methods like `awaitAssignment` or > `awaitRebalance` to wait for the initialization completed for tests, which > is checking with `poll` > You can refer to this test to learn how Kafka write tests internally: > https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala > > For the feature request, I think it will be a big change if we want to > start the rebalance and other metadata update during `subscribe`. > So, it might need a more solid motivation for this change. > Welcome to propose this feature with KIP process: > https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals > > Thank you. > Luke > > On Tue, Jan 25, 2022 at 12:27 PM Paul Whalen <pgwha...@gmail.com> wrote: > > > Faced with the same problem, we've done the following: > > - Find and save the latest offset(s) using consumer.endOffsets() > > - consumer.poll() and process records in a while loop until you've > > processed up through the saved offsets > > > > Notice that it doesn't matter how long the poll duration is: you've set > > your watermark and will read until you get there, no matter how long it > > takes. And you know you're going to get there eventually (assuming the > > cluster is healthy), because you know the offsets exist. > > > > Hope that helps! > > > > Paul > > > > On Mon, Jan 24, 2022 at 10:12 PM Sebastian Proksch <sebast...@proks.ch> > > wrote: > > > > > Hey all! I am actually not sure whether this is a bug report or a > > > feature request, maybe you can help me understand whether I am missing > > > the obvious. > > > > > > I am using Apache Kafka in one of my Java projects (using > > > apache-kafka-clients-3.1.0). In an integration test, I would like to > > > check that (after running some producers) a bunch of topics have the > > > expected contents. As such, the whole pipeline is filled and "standing > > > still" and I am connecting a single new consumer that alone forms a > > > new consumer group. I would like to iterate over all topics > > > (`KafkaConsumer.listTopics()`) and one by one consume all messages of > > > every topic. > > > > > > As far as I understand, a subscription on a new topic will initialize > > > a new consumer group and it takes a bit of time to join the group and > > > to rebalance the partitions. However, I would expect that this logic > > > is blocking and I find the following behavior unexpected... > > > > > > var con = ... // get consumer instance > > > con.subscribe(Set.of(SOME_TOPIC)); > > > var records = con.poll(Duration.ZERO) > > > > > > The poll will return an empty records array, even though there *IS* > > > data in said topic. Even when I register a `ConsumerRebalanceListener` > > > in the subscribe call, I won't ever see any assignment of > > > `TopicPartition` to the consumer, not even a delayed one. > > > > > > On the other hand, when I change the code to > > > > > > var con = ... // get consumer instance > > > con.subscribe(Set.of(SOME_TOPIC)); > > > var records = con.poll(Duration.ofMillis(100)); > > > > > > I now get actual records. Also, when I register a > > > `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned` > > > notification. > > > > > > I really dislike the idea of introducing magic numbers like the 100ms > > > to my tests... if the number is too small and the assignment takes > > > longer for larger topics, my tests will break. Too large numbers will > > > slow the tests down unnecessarily. The functionality that I am > > > actually looking for is a synchronous version of the `subscribe` > > > method or some other way to block execution until my client has > > > finished joining the group and the partitions are rebalanced. It feels > > > like this should be default behavior. > > > > > > Am I completely off with my expectations for the behavior of the > > > `subscribe` method or am I missing something? Is there a way to > > > achieve said behavior with the current clients? Maybe my code just > > > lacks the right config parameter... > > > > > > Thanks for your help, any pointer is appreciated! > > > > > > best > > > Sebastian > > > > >