Hi David, Thanks for the summary. It seems to me that the Flow-like option is best because it can easily handle cancellations and exceptions, returning the topic partition information and signalling when the last of the results have been returned. I think it’s also equally applicable to any of the other KafkaAdminClient methods which could benefit from returning results progressively from the broker such as describing all of the consumer groups on a massive cluster. That would of course be another KIP :)
Thanks, Andrew > On 28 Feb 2024, at 22:30, David Arthur <david.art...@confluent.io.INVALID> > wrote: > > Andrew/Jose, I like the suggested Flow API. It's also similar to the stream > observers in GPRC. I'm not sure we should expose something as complex as > the Flow API directly in KafkaAdminClient, but certainly we can provide a > similar interface. > > --- > Cancellations: > > Another thing not yet discussed is how to cancel in-flight requests. For > other calls in KafkaAdminClient, we use KafkaFuture which has a "cancel" > method. With the callback approach, we need to be able to cancel the > request from within the callback as well as externally. Looking to the Flow > API again for inspiration, we could have the admin client pass an object to > the callback which can be used for cancellation. In the simple case, users > can ignore this object. In the advanced case, they can create a concrete > class for the callback and cache the cancellation object so it can be > accessed externally. This would be similar to the Subscription in the Flow > API. > > --- > Topics / Partitions: > > For the case of topic descriptions, we actually have two data types > interleaved in one stream (topics and partitions). This means if we go with > TopicDescription in the "onNext" method, we will have a partial set of > topics in some cases. Also, we will end up calling "onNext" more than once > for each RPC in the case that a single RPC response spans multiple topics. > > One alternative to a single "onNext" would be an interface more tailored to > the RPC like: > > interface DescribeTopicsStreamObserver { > // Called for each topic in the result stream. > void onTopic(TopicInfo topic); > > // Called for each partition of the topic last handled by onTopic > void onPartition(TopicPartitionInfo partition); > > // Called once the broker has finished streaming results to the admin > client. This marks the end of the stream. > void onComplete(); > > // Called if an error occurs on the underlying stream. This marks the end > of the stream. > void onError(Throwable t); > } > > --- > Consumer API: > > Offline, there was some discussion about using a simple SAM consumer-like > interface: > > interface AdminResultsConsumer<T> { > void onNext(T next, Throwable t); > } > > This has the benefit of being quite simple and letting callers supply a > lambda instead of a full anonymous class definition. This would use > nullable arguments like CompletableFuture#whenComplete. We could also use > an Optional pattern here instead of nullables. > > --- > Summary: > > So far, it seems like we are looking at these different options. The main > difference in terms of API design is if the user will need to implement > more than one method, or if a lambda can suffice. > > 1. Generic, Flow-like interface: AdminResultsSubscriber > 2. DescribeTopicsStreamObserver (in this message above) > 3. AdminResultsConsumer > 4. AdminResultsConsumer with an Optional-like type instead of nullable > arguments > > > > -David > > > > > On Fri, Feb 23, 2024 at 4:00 PM José Armando García Sancio > <jsan...@confluent.io.invalid> wrote: > >> Hi Calvin >> >> On Fri, Feb 23, 2024 at 9:23 AM Calvin Liu <ca...@confluent.io.invalid> >> wrote: >>> As we agreed to implement the pagination for the new API >>> DescribeTopicPartitions, the client side must also add a proper interface >>> to handle the pagination. >>> The current KafkaAdminClient.describeTopics returns >>> the DescribeTopicsResult which is the future for querying all the topics. >>> It is awkward to fit the pagination into it because >> >> I suggest taking a look at Java's Flow API: >> >> https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Flow.html >> It was design for this specific use case and many libraries integrate with >> it. >> >> If the Kafka client cannot be upgraded to support the Java 9 which >> introduced that API, you can copy the same interface and semantics. >> This would allow users to easily integrate with reactive libraries >> since they all integrate with Java Flow. >> >> Thanks, >> -- >> -José >> > > > -- > -David