All:

I've been going over the new consumer APIs and it seems like we're
squishing a lot of different concerns together into a single class. The
scope of the new Consumer is kind of all over the place. Managing the
lifecycle - and especially the thread safety - seems challenging.
Specifically, Consumer seems to serve the following purposes:
* Acts as a holder of subscription info (e.g. subscribe()).
* Acts as a stream (e.g. poll(), seek()).

I definitely think we want these to be separate. It's pretty common to have
a consumer process that connects to the broker, creates N consumer threads,
each of which working on a single stream (which could be composed of some
number of partitions). In this scenario, you *really* want to explicitly
control durability (e.g. commit()s) on a per-stream basis. You also have
different lifecycle semantics and thread safety concerns at the stream
level versus the global level. Is there a reason the API doesn't look more
like:

// Thread safe, owns the multiplexed connection
Consumer:
  def subscribe(topic: String, streams: Int): Set[Stream]
  def close() // Release everything

// Not at all thread safe, no synchronization.
Stream:
  def commit() // Really important this be here and not on Consumer.
  def seek(...)
  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
  def close() // Release these partitions
  ...

I think this also significantly reduces the complexity of the Consumer API
and lets each thread in a consumer process handle stream lifecycle
appropriately. Since the connection is multiplexed and things could get
rebalanced, just toss an exception if the streams become invalid, forcing a
resubscribe. That way we don't have crazy state logic.

I'm sure I'm missing something, but I wanted to toss this out there for
folks to poke at.
(p.s. I *really* want per-stream commit baked into the API.)
-- 
E. Sammer
CTO - ScalingData

Reply via email to