All: I've been going over the new consumer APIs and it seems like we're squishing a lot of different concerns together into a single class. The scope of the new Consumer is kind of all over the place. Managing the lifecycle - and especially the thread safety - seems challenging. Specifically, Consumer seems to serve the following purposes: * Acts as a holder of subscription info (e.g. subscribe()). * Acts as a stream (e.g. poll(), seek()).
I definitely think we want these to be separate. It's pretty common to have a consumer process that connects to the broker, creates N consumer threads, each of which working on a single stream (which could be composed of some number of partitions). In this scenario, you *really* want to explicitly control durability (e.g. commit()s) on a per-stream basis. You also have different lifecycle semantics and thread safety concerns at the stream level versus the global level. Is there a reason the API doesn't look more like: // Thread safe, owns the multiplexed connection Consumer: def subscribe(topic: String, streams: Int): Set[Stream] def close() // Release everything // Not at all thread safe, no synchronization. Stream: def commit() // Really important this be here and not on Consumer. def seek(...) def poll(duration: Long, unit: TimeUnit): List[MessageThingie] def close() // Release these partitions ... I think this also significantly reduces the complexity of the Consumer API and lets each thread in a consumer process handle stream lifecycle appropriately. Since the connection is multiplexed and things could get rebalanced, just toss an exception if the streams become invalid, forcing a resubscribe. That way we don't have crazy state logic. I'm sure I'm missing something, but I wanted to toss this out there for folks to poke at. (p.s. I *really* want per-stream commit baked into the API.) -- E. Sammer CTO - ScalingData