Hey Eric, Yeah this is more similar to what we currently have but with a richer api then a simple Iterator.
I think the question is how the poll() on the various streams translates into the ultimate poll that we need to do against the individual socket connections. Some of the things that make the current api not ideal are the following: 1. Currently we have fetcher threads that do network I/O and then blocking queues that feed iterators (the equivalent of the Stream) in your proposal. However it is a bit unfortunate to have the client dictate a particular threading model. One thing we liked about the current proposal was that it is all one thread. It only does I/O when you call poll(). If the user then wants to feed these into queues to go to thread pools that is fine but not required. In a sense the client is sort of a lower level building block--I think there are actually a couple of higher level APIs that could be built on top of this to (such as the Java 8 Stream stuff or something that manages a thread pool of processors for you). We thought about some of these higher-level APIs, and though they are definitely more convenient for certain uses they are not as general. 2. There are also a ton of gotchas in terms of cleanly re-assigning partitions. You need some "safe point" at which the user isn't processing any more. In the current API when the user calls poll() the meaning of that call is that all previous messages have been processed. Hence you can commit offsets (if using autocommit) or reassign partitions transparently. This gets a bit more complex if there are many polls() on different time windows (though perhaps still possible). 3. We would have to figure out how unioning two streams would work. You need to have a way of polling over all the streams for things that consume multiple inputs. Not sure how much of that makes sense... -Jay On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <esam...@scalingdata.com> wrote: > All: > > I've been going over the new consumer APIs and it seems like we're > squishing a lot of different concerns together into a single class. The > scope of the new Consumer is kind of all over the place. Managing the > lifecycle - and especially the thread safety - seems challenging. > Specifically, Consumer seems to serve the following purposes: > * Acts as a holder of subscription info (e.g. subscribe()). > * Acts as a stream (e.g. poll(), seek()). > > I definitely think we want these to be separate. It's pretty common to have > a consumer process that connects to the broker, creates N consumer threads, > each of which working on a single stream (which could be composed of some > number of partitions). In this scenario, you *really* want to explicitly > control durability (e.g. commit()s) on a per-stream basis. You also have > different lifecycle semantics and thread safety concerns at the stream > level versus the global level. Is there a reason the API doesn't look more > like: > > // Thread safe, owns the multiplexed connection > Consumer: > def subscribe(topic: String, streams: Int): Set[Stream] > def close() // Release everything > > // Not at all thread safe, no synchronization. > Stream: > def commit() // Really important this be here and not on Consumer. > def seek(...) > def poll(duration: Long, unit: TimeUnit): List[MessageThingie] > def close() // Release these partitions > ... > > I think this also significantly reduces the complexity of the Consumer API > and lets each thread in a consumer process handle stream lifecycle > appropriately. Since the connection is multiplexed and things could get > rebalanced, just toss an exception if the streams become invalid, forcing a > resubscribe. That way we don't have crazy state logic. > > I'm sure I'm missing something, but I wanted to toss this out there for > folks to poke at. > (p.s. I *really* want per-stream commit baked into the API.) > -- > E. Sammer > CTO - ScalingData >