Yeah I was thinking either along the lines Joel was suggesting or else
adding a logEndOffset(TopicPartition) method or something like that. As
Joel says the consumer actually has this information internally (we return
it with the fetch request) but doesn't expose it.

-Jay

On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

> > > 2. Make the log end offset available more easily in the consumer.
> >
> > Was thinking something would need to be added in LogCleanerManager, in
> the
> > updateCheckpoints function. Where would be best to publish the
> information
> > to make it more easily available, or would you just expose the
> > offset-cleaner-checkpoint file as it is?
> > Is it right you would also need to know which offset-cleaner-checkpoint
> > entry related to each active partition?
>
> I'm not sure if I misunderstood Jay's suggestion, but I think it is
> along the lines of: we expose the log-end-offset (actually the high
> watermark) of the partition in the fetch response. However, this is
> not exposed to the consumer (either in the new ConsumerRecord class
> or the existing MessageAndMetadata class). If we did, then if you
> were to consume a record you can check that it has offsets up to the
> log-end offset. If it does then you would know for sure that you have
> consumed everything for that partition.
>
> > Yes, was looking at this initially, but as we have 100-150 writes per
> > second, it could be a while before there is a pause long enough to check
> it
> > has caught up. Even with the consumer timeout set to -1, it takes some
> time
> > to query the max offset values, which is still long enough for more
> > messages to arrive.
>
> Got it - thanks for clarifying.
>
> >
> >
> >
> > On 18 February 2015 at 23:16, Joel Koshy <jjkosh...@gmail.com> wrote:
> >
> > > > You are also correct and perceptive to notice that if you check the
> end
> > > of
> > > > the log then begin consuming and read up to that point compaction may
> > > have
> > > > already kicked in (if the reading takes a while) and hence you might
> have
> > > > an incomplete snapshot.
> > >
> > > Isn't it sufficient to just repeat the check at the end after reading
> > > the log and repeat until you are truly done? At least for the purposes
> > > of a snapshot?
> > >
> > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > > If you catch up off a compacted topic and keep consuming then you
> will
> > > > become consistent with the log.
> > > >
> > > > I think what you are saying is that you want to create a snapshot
> from
> > > the
> > > > Kafka topic but NOT do continual reads after that point. For example
> you
> > > > might be creating a backup of the data to a file.
> > > >
> > > > I agree that this isn't as easy as it could be. As you say the only
> > > > solution we have is that timeout which doesn't differentiate between
> GC
> > > > stall in your process and no more messages left so you would need to
> tune
> > > > the timeout. This is admittedly kind of a hack.
> > > >
> > > > You are also correct and perceptive to notice that if you check the
> end
> > > of
> > > > the log then begin consuming and read up to that point compaction may
> > > have
> > > > already kicked in (if the reading takes a while) and hence you might
> have
> > > > an incomplete snapshot.
> > > >
> > > > I think there are two features we could add that would make this
> easier:
> > > > 1. Make the cleaner point configurable on a per-topic basis. This
> feature
> > > > would allow you to control how long the full log is retained and when
> > > > compaction can kick in. This would give a configurable SLA for the
> reader
> > > > process to catch up.
> > > > 2. Make the log end offset available more easily in the consumer.
> > > >
> > > > -Jay
> > > >
> > > >
> > > >
> > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell <
> w.f.funn...@gmail.com>
> > > > wrote:
> > > >
> > > > > We are currently using Kafka 0.8.1.1 with log compaction in order
> to
> > > > > provide streams of messages to our clients.
> > > > >
> > > > > As well as constantly consuming the stream, one of our use cases
> is to
> > > > > provide a snapshot, meaning the user will receive a copy of every
> > > message
> > > > > at least once.
> > > > >
> > > > > Each one of these messages represents an item of content in our
> system.
> > > > >
> > > > >
> > > > > The problem comes when determining if the client has actually
> reached
> > > the
> > > > > end of the topic.
> > > > >
> > > > > The standard Kafka way of dealing with this seems to be by using a
> > > > > ConsumerTimeoutException, but we are frequently getting this error
> > > when the
> > > > > end of the topic has not been reached or even it may take a long
> time
> > > > > before a timeout naturally occurs.
> > > > >
> > > > >
> > > > > On first glance it would seem possible to do a lookup for the max
> > > offset
> > > > > for each partition when you begin consuming, stopping when this
> > > position it
> > > > > reached.
> > > > >
> > > > > But log compaction means that if an update to a piece of content
> > > arrives
> > > > > with the same message key, then this will be written to the end so
> the
> > > > > snapshot will be incomplete.
> > > > >
> > > > >
> > > > > Another thought is to make use of the cleaner point. Currently
> Kafka
> > > writes
> > > > > out to a "cleaner-offset-checkpoint" file in each data directory
> which
> > > is
> > > > > written to after log compaction completes.
> > > > >
> > > > > If the consumer was able to access the cleaner-offset-checkpoint
> you
> > > would
> > > > > be able to consume up to this point, check the point was still the
> > > same,
> > > > > and compaction had not yet occurred, and therefore determine you
> had
> > > > > receive everything at least once. (Assuming there was no race
> condition
> > > > > between compaction and writing to the file)
> > > > >
> > > > >
> > > > > Has anybody got any thoughts?
> > > > >
> > > > > Will
> > > > >
> > >
> > >
> >
> >
> > --
> > Will Funnell
>
>

Reply via email to