The log end offset (of a partition) changes when messages are appended
to the partition. (It is not correlated with the consumer's offset).


On Thu, Feb 19, 2015 at 08:58:10PM +0000, Will Funnell wrote:
> So at what point does the log end offset change? When you commit?
> 
> On 19 February 2015 at 18:47, Joel Koshy <jjkosh...@gmail.com> wrote:
> 
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> >
> > Compaction actually only runs on the rolled over segments (not the
> > active - i.e., latest segment). The log-end-offset will be in the
> > latest segment which does not participate in compaction.
> >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > >
> > > Isn't this just roughly the same value as using c.getOffsetsBefore()
> > with a
> > > partitionRequestTime of -1?
> > >
> > >
> > > Although its always later than the cleaner point, surely log compaction
> > is
> > > still an issue here.
> > >
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> > >
> > >
> > > My thinking was that if you knew the log cleaner point, you could:
> > >
> > > Make a note of the starting offset
> > > Consume till end of log
> > > Check my starting point is ahead of current cleaner point, otherwise
> > loop.
> > >
> > >
> > > I appreciate there is a chance I misunderstood your point.
> > >
> > > On 19 February 2015 at 18:02, Jay Kreps <jay.kr...@gmail.com> wrote:
> > >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > > >
> > > > -Jay
> > > >
> > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w.f.funn...@gmail.com>
> > > > wrote:
> > > >
> > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > > watermark) of the partition in the fetch response. However, this is
> > > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > were to consume a record you can check that it has offsets up to
> > the
> > > > > > log-end offset. If it does then you would know for sure that you
> > have
> > > > > > consumed everything for that partition
> > > > >
> > > > > To confirm then, the log-end-offset is the same as the cleaner point?
> > > > >
> > > > >
> > > > >
> > > > > On 19 February 2015 at 03:10, Jay Kreps <jay.kr...@gmail.com> wrote:
> > > > >
> > > > > > Yeah I was thinking either along the lines Joel was suggesting or
> > else
> > > > > > adding a logEndOffset(TopicPartition) method or something like
> > that. As
> > > > > > Joel says the consumer actually has this information internally (we
> > > > > return
> > > > > > it with the fetch request) but doesn't expose it.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jjkosh...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > > > 2. Make the log end offset available more easily in the
> > consumer.
> > > > > > > >
> > > > > > > > Was thinking something would need to be added in
> > LogCleanerManager,
> > > > > in
> > > > > > > the
> > > > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > > > information
> > > > > > > > to make it more easily available, or would you just expose the
> > > > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > > > Is it right you would also need to know which
> > > > > offset-cleaner-checkpoint
> > > > > > > > entry related to each active partition?
> > > > > > >
> > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it
> > is
> > > > > > > along the lines of: we expose the log-end-offset (actually the
> > high
> > > > > > > watermark) of the partition in the fetch response. However, this
> > is
> > > > > > > not exposed to the consumer (either in the new ConsumerRecord
> > class
> > > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > > were to consume a record you can check that it has offsets up to
> > the
> > > > > > > log-end offset. If it does then you would know for sure that you
> > have
> > > > > > > consumed everything for that partition.
> > > > > > >
> > > > > > > > Yes, was looking at this initially, but as we have 100-150
> > writes
> > > > per
> > > > > > > > second, it could be a while before there is a pause long
> > enough to
> > > > > > check
> > > > > > > it
> > > > > > > > has caught up. Even with the consumer timeout set to -1, it
> > takes
> > > > > some
> > > > > > > time
> > > > > > > > to query the max offset values, which is still long enough for
> > more
> > > > > > > > messages to arrive.
> > > > > > >
> > > > > > > Got it - thanks for clarifying.
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On 18 February 2015 at 23:16, Joel Koshy <jjkosh...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > > You are also correct and perceptive to notice that if you
> > check
> > > > > the
> > > > > > > end
> > > > > > > > > of
> > > > > > > > > > the log then begin consuming and read up to that point
> > > > compaction
> > > > > > may
> > > > > > > > > have
> > > > > > > > > > already kicked in (if the reading takes a while) and hence
> > you
> > > > > > might
> > > > > > > have
> > > > > > > > > > an incomplete snapshot.
> > > > > > > > >
> > > > > > > > > Isn't it sufficient to just repeat the check at the end after
> > > > > reading
> > > > > > > > > the log and repeat until you are truly done? At least for the
> > > > > > purposes
> > > > > > > > > of a snapshot?
> > > > > > > > >
> > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > > > > > > > If you catch up off a compacted topic and keep consuming
> > then
> > > > you
> > > > > > > will
> > > > > > > > > > become consistent with the log.
> > > > > > > > > >
> > > > > > > > > > I think what you are saying is that you want to create a
> > > > snapshot
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > Kafka topic but NOT do continual reads after that point.
> > For
> > > > > > example
> > > > > > > you
> > > > > > > > > > might be creating a backup of the data to a file.
> > > > > > > > > >
> > > > > > > > > > I agree that this isn't as easy as it could be. As you say
> > the
> > > > > only
> > > > > > > > > > solution we have is that timeout which doesn't
> > differentiate
> > > > > > between
> > > > > > > GC
> > > > > > > > > > stall in your process and no more messages left so you
> > would
> > > > need
> > > > > > to
> > > > > > > tune
> > > > > > > > > > the timeout. This is admittedly kind of a hack.
> > > > > > > > > >
> > > > > > > > > > You are also correct and perceptive to notice that if you
> > check
> > > > > the
> > > > > > > end
> > > > > > > > > of
> > > > > > > > > > the log then begin consuming and read up to that point
> > > > compaction
> > > > > > may
> > > > > > > > > have
> > > > > > > > > > already kicked in (if the reading takes a while) and hence
> > you
> > > > > > might
> > > > > > > have
> > > > > > > > > > an incomplete snapshot.
> > > > > > > > > >
> > > > > > > > > > I think there are two features we could add that would make
> > > > this
> > > > > > > easier:
> > > > > > > > > > 1. Make the cleaner point configurable on a per-topic
> > basis.
> > > > This
> > > > > > > feature
> > > > > > > > > > would allow you to control how long the full log is
> > retained
> > > > and
> > > > > > when
> > > > > > > > > > compaction can kick in. This would give a configurable SLA
> > for
> > > > > the
> > > > > > > reader
> > > > > > > > > > process to catch up.
> > > > > > > > > > 2. Make the log end offset available more easily in the
> > > > consumer.
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> > > > > > > w.f.funn...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction
> > in
> > > > > order
> > > > > > > to
> > > > > > > > > > > provide streams of messages to our clients.
> > > > > > > > > > >
> > > > > > > > > > > As well as constantly consuming the stream, one of our
> > use
> > > > > cases
> > > > > > > is to
> > > > > > > > > > > provide a snapshot, meaning the user will receive a copy
> > of
> > > > > every
> > > > > > > > > message
> > > > > > > > > > > at least once.
> > > > > > > > > > >
> > > > > > > > > > > Each one of these messages represents an item of content
> > in
> > > > our
> > > > > > > system.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > The problem comes when determining if the client has
> > actually
> > > > > > > reached
> > > > > > > > > the
> > > > > > > > > > > end of the topic.
> > > > > > > > > > >
> > > > > > > > > > > The standard Kafka way of dealing with this seems to be
> > by
> > > > > using
> > > > > > a
> > > > > > > > > > > ConsumerTimeoutException, but we are frequently getting
> > this
> > > > > > error
> > > > > > > > > when the
> > > > > > > > > > > end of the topic has not been reached or even it may
> > take a
> > > > > long
> > > > > > > time
> > > > > > > > > > > before a timeout naturally occurs.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On first glance it would seem possible to do a lookup
> > for the
> > > > > max
> > > > > > > > > offset
> > > > > > > > > > > for each partition when you begin consuming, stopping
> > when
> > > > this
> > > > > > > > > position it
> > > > > > > > > > > reached.
> > > > > > > > > > >
> > > > > > > > > > > But log compaction means that if an update to a piece of
> > > > > content
> > > > > > > > > arrives
> > > > > > > > > > > with the same message key, then this will be written to
> > the
> > > > end
> > > > > > so
> > > > > > > the
> > > > > > > > > > > snapshot will be incomplete.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Another thought is to make use of the cleaner point.
> > > > Currently
> > > > > > > Kafka
> > > > > > > > > writes
> > > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each data
> > > > > directory
> > > > > > > which
> > > > > > > > > is
> > > > > > > > > > > written to after log compaction completes.
> > > > > > > > > > >
> > > > > > > > > > > If the consumer was able to access the
> > > > > cleaner-offset-checkpoint
> > > > > > > you
> > > > > > > > > would
> > > > > > > > > > > be able to consume up to this point, check the point was
> > > > still
> > > > > > the
> > > > > > > > > same,
> > > > > > > > > > > and compaction had not yet occurred, and therefore
> > determine
> > > > > you
> > > > > > > had
> > > > > > > > > > > receive everything at least once. (Assuming there was no
> > race
> > > > > > > condition
> > > > > > > > > > > between compaction and writing to the file)
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Has anybody got any thoughts?
> > > > > > > > > > >
> > > > > > > > > > > Will
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Will Funnell
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Will Funnell
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Will Funnell
> >
> >
> 
> 
> -- 
> Will Funnell

Reply via email to