This looks great. What is the time frame for this effort? Jason
On Wed, Oct 16, 2013 at 2:19 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Btw, after we complete KAFKA-1000 (offset management in Kafka) it > should be reasonable to commit offsets on every message as long as the > optional metadata portion of the offset commit request is small/empty. > > Thanks, > > Joel > > > On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <j...@squareup.com> > wrote: > > That would be great. Additionally, in the new api, it would be awesome > > augment the default auto-commit functionality to allow client code to > mark > > a message for commit only after processing a message successfully! > > > > > > On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <jun...@gmail.com> wrote: > > > >> For manual offset commits, it will be useful to have some kind of API > that > >> informs the client when a rebalance is going to happen. We can think > about > >> this when we do the client rewrite. > >> > >> Thanks, > >> > >> Jun > >> > >> > >> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <j...@squareup.com> > wrote: > >> > >> > Jun, > >> > > >> > Yes, sorry, I think that was the basis for my question. When auto > >> commit > >> > is enabled, special care is taken to make sure things are > auto-committed > >> > during a rebalance. This is needed because when a topic moves off of > a > >> > consumer thread (since it is being rebalanced to another one), it's > as if > >> > that topic is being shutdown on that connector, and any > not-yet-committed > >> > messages need to be committed before letting go of the topic. > >> > > >> > So, my question is around trying to understand if there's a way I can > >> > reproduce similar functionality using my own sync auto commit > >> > implementation (and I'm not sure there is). It seems that when > there's a > >> > rebalance, all processed but not-yet-committed offsets will not be > >> > committed, and thus there will be no way to prevent pretty massive > >> > duplicate consumption on a rebalance. Is that about right? Or is > there > >> > someway around this that I'm not seeing? > >> > > >> > The auto-commit functionality that's builtin is so close to being all > >> that > >> > anyone would need, except it has a glaring weakness, in that it will > >> cause > >> > messages to be lost from time to time, and so I don't know that it > will > >> > meet the needs of trying to have reliable delivery (with duplicates > ok). > >> > > >> > Jason > >> > > >> > > >> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <jun...@gmail.com> wrote: > >> > > >> > > If auto commit is disabled, the consumer connector won't call > >> > commitOffsets > >> > > during rebalancing. > >> > > > >> > > Thanks, > >> > > > >> > > Jun > >> > > > >> > > > >> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <j...@squareup.com> > >> > wrote: > >> > > > >> > > > I'm looking at implementing a synchronous auto offset commit > >> solution. > >> > > > People have discussed the need for this in previous > >> > > > threads......Basically, in my consumer loop, I want to make sure a > >> > > message > >> > > > has been actually processed before allowing it's offset to be > >> > committed. > >> > > > But I don't want to commit on every message, since that would be > too > >> > > > expensive. So, I want to use the 'auto.commit.interval.ms' to > >> > > > periodically > >> > > > call commitOffsets, but only after a message is processed, but not > >> > after > >> > > > the next message has been issued via a call to 'next()' on the > >> > > > ConsumerIterator. > >> > > > > >> > > > The builtin 'auto.commit.enable' feature unfortunately allows > commits > >> > to > >> > > > happen on any message that has been returned via > >> > ConsumerIterator.next(). > >> > > > But if the consumer goes down before actually processing the > >> message, > >> > or > >> > > > if it hangs indefinitely for some reason, then this message will > get > >> > > > committed before it has actually been consumed successfully. > >> > > > > >> > > > I think there are issues with trying to implement this on top of > the > >> > > > high-level consumer api. First, I need to worry about multiple > >> threads > >> > > > consuming in the same connector (so for now I'm limiting this to > >> > support > >> > > > only 1 thread). > >> > > > > >> > > > Also, when shutting down the connector, I need to make sure any > >> pending > >> > > > messages are committed before allowing the connector to shutdown. > >> So, > >> > > that > >> > > > seems easy enough to handle. > >> > > > > >> > > > One thing I'm more concerned with, is what happens when there's a > >> > > consumer > >> > > > rebalance. Looking at the ZookeeperConsumerConnector code, it > seems > >> > > there > >> > > > are explicit calls to commitOffsets during the rebalance. I'm not > >> sure > >> > > how > >> > > > to handle that from the high-level api (and do I need to worry > about > >> > > > that?). > >> > > > > >> > > > Thanks for any insight. > >> > > > > >> > > > Jason > >> > > > > >> > > > >> > > >> >