So at what point does the log end offset change? When you commit? On 19 February 2015 at 18:47, Joel Koshy <jjkosh...@gmail.com> wrote:
> > If I consumed up to the log end offset and log compaction happens in > > between, I would have missed some messages. > > Compaction actually only runs on the rolled over segments (not the > active - i.e., latest segment). The log-end-offset will be in the > latest segment which does not participate in compaction. > > > > The log end offset is just the end of the committed messages in the log > > > (the last thing the consumer has access to). It isn't the same as the > > > cleaner point but is always later than it so it would work just as > well. > > > > Isn't this just roughly the same value as using c.getOffsetsBefore() > with a > > partitionRequestTime of -1? > > > > > > Although its always later than the cleaner point, surely log compaction > is > > still an issue here. > > > > If I consumed up to the log end offset and log compaction happens in > > between, I would have missed some messages. > > > > > > My thinking was that if you knew the log cleaner point, you could: > > > > Make a note of the starting offset > > Consume till end of log > > Check my starting point is ahead of current cleaner point, otherwise > loop. > > > > > > I appreciate there is a chance I misunderstood your point. > > > > On 19 February 2015 at 18:02, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > The log end offset is just the end of the committed messages in the log > > > (the last thing the consumer has access to). It isn't the same as the > > > cleaner point but is always later than it so it would work just as > well. > > > > > > -Jay > > > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell <w.f.funn...@gmail.com> > > > wrote: > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > > > along the lines of: we expose the log-end-offset (actually the high > > > > > watermark) of the partition in the fetch response. However, this is > > > > > not exposed to the consumer (either in the new ConsumerRecord class > > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > > were to consume a record you can check that it has offsets up to > the > > > > > log-end offset. If it does then you would know for sure that you > have > > > > > consumed everything for that partition > > > > > > > > To confirm then, the log-end-offset is the same as the cleaner point? > > > > > > > > > > > > > > > > On 19 February 2015 at 03:10, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > > > > > Yeah I was thinking either along the lines Joel was suggesting or > else > > > > > adding a logEndOffset(TopicPartition) method or something like > that. As > > > > > Joel says the consumer actually has this information internally (we > > > > return > > > > > it with the fetch request) but doesn't expose it. > > > > > > > > > > -Jay > > > > > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy <jjkosh...@gmail.com> > > > wrote: > > > > > > > > > > > > > 2. Make the log end offset available more easily in the > consumer. > > > > > > > > > > > > > > Was thinking something would need to be added in > LogCleanerManager, > > > > in > > > > > > the > > > > > > > updateCheckpoints function. Where would be best to publish the > > > > > > information > > > > > > > to make it more easily available, or would you just expose the > > > > > > > offset-cleaner-checkpoint file as it is? > > > > > > > Is it right you would also need to know which > > > > offset-cleaner-checkpoint > > > > > > > entry related to each active partition? > > > > > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it > is > > > > > > along the lines of: we expose the log-end-offset (actually the > high > > > > > > watermark) of the partition in the fetch response. However, this > is > > > > > > not exposed to the consumer (either in the new ConsumerRecord > class > > > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > > > were to consume a record you can check that it has offsets up to > the > > > > > > log-end offset. If it does then you would know for sure that you > have > > > > > > consumed everything for that partition. > > > > > > > > > > > > > Yes, was looking at this initially, but as we have 100-150 > writes > > > per > > > > > > > second, it could be a while before there is a pause long > enough to > > > > > check > > > > > > it > > > > > > > has caught up. Even with the consumer timeout set to -1, it > takes > > > > some > > > > > > time > > > > > > > to query the max offset values, which is still long enough for > more > > > > > > > messages to arrive. > > > > > > > > > > > > Got it - thanks for clarifying. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy <jjkosh...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > > You are also correct and perceptive to notice that if you > check > > > > the > > > > > > end > > > > > > > > of > > > > > > > > > the log then begin consuming and read up to that point > > > compaction > > > > > may > > > > > > > > have > > > > > > > > > already kicked in (if the reading takes a while) and hence > you > > > > > might > > > > > > have > > > > > > > > > an incomplete snapshot. > > > > > > > > > > > > > > > > Isn't it sufficient to just repeat the check at the end after > > > > reading > > > > > > > > the log and repeat until you are truly done? At least for the > > > > > purposes > > > > > > > > of a snapshot? > > > > > > > > > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: > > > > > > > > > If you catch up off a compacted topic and keep consuming > then > > > you > > > > > > will > > > > > > > > > become consistent with the log. > > > > > > > > > > > > > > > > > > I think what you are saying is that you want to create a > > > snapshot > > > > > > from > > > > > > > > the > > > > > > > > > Kafka topic but NOT do continual reads after that point. > For > > > > > example > > > > > > you > > > > > > > > > might be creating a backup of the data to a file. > > > > > > > > > > > > > > > > > > I agree that this isn't as easy as it could be. As you say > the > > > > only > > > > > > > > > solution we have is that timeout which doesn't > differentiate > > > > > between > > > > > > GC > > > > > > > > > stall in your process and no more messages left so you > would > > > need > > > > > to > > > > > > tune > > > > > > > > > the timeout. This is admittedly kind of a hack. > > > > > > > > > > > > > > > > > > You are also correct and perceptive to notice that if you > check > > > > the > > > > > > end > > > > > > > > of > > > > > > > > > the log then begin consuming and read up to that point > > > compaction > > > > > may > > > > > > > > have > > > > > > > > > already kicked in (if the reading takes a while) and hence > you > > > > > might > > > > > > have > > > > > > > > > an incomplete snapshot. > > > > > > > > > > > > > > > > > > I think there are two features we could add that would make > > > this > > > > > > easier: > > > > > > > > > 1. Make the cleaner point configurable on a per-topic > basis. > > > This > > > > > > feature > > > > > > > > > would allow you to control how long the full log is > retained > > > and > > > > > when > > > > > > > > > compaction can kick in. This would give a configurable SLA > for > > > > the > > > > > > reader > > > > > > > > > process to catch up. > > > > > > > > > 2. Make the log end offset available more easily in the > > > consumer. > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell < > > > > > > w.f.funn...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction > in > > > > order > > > > > > to > > > > > > > > > > provide streams of messages to our clients. > > > > > > > > > > > > > > > > > > > > As well as constantly consuming the stream, one of our > use > > > > cases > > > > > > is to > > > > > > > > > > provide a snapshot, meaning the user will receive a copy > of > > > > every > > > > > > > > message > > > > > > > > > > at least once. > > > > > > > > > > > > > > > > > > > > Each one of these messages represents an item of content > in > > > our > > > > > > system. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The problem comes when determining if the client has > actually > > > > > > reached > > > > > > > > the > > > > > > > > > > end of the topic. > > > > > > > > > > > > > > > > > > > > The standard Kafka way of dealing with this seems to be > by > > > > using > > > > > a > > > > > > > > > > ConsumerTimeoutException, but we are frequently getting > this > > > > > error > > > > > > > > when the > > > > > > > > > > end of the topic has not been reached or even it may > take a > > > > long > > > > > > time > > > > > > > > > > before a timeout naturally occurs. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On first glance it would seem possible to do a lookup > for the > > > > max > > > > > > > > offset > > > > > > > > > > for each partition when you begin consuming, stopping > when > > > this > > > > > > > > position it > > > > > > > > > > reached. > > > > > > > > > > > > > > > > > > > > But log compaction means that if an update to a piece of > > > > content > > > > > > > > arrives > > > > > > > > > > with the same message key, then this will be written to > the > > > end > > > > > so > > > > > > the > > > > > > > > > > snapshot will be incomplete. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another thought is to make use of the cleaner point. > > > Currently > > > > > > Kafka > > > > > > > > writes > > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each data > > > > directory > > > > > > which > > > > > > > > is > > > > > > > > > > written to after log compaction completes. > > > > > > > > > > > > > > > > > > > > If the consumer was able to access the > > > > cleaner-offset-checkpoint > > > > > > you > > > > > > > > would > > > > > > > > > > be able to consume up to this point, check the point was > > > still > > > > > the > > > > > > > > same, > > > > > > > > > > and compaction had not yet occurred, and therefore > determine > > > > you > > > > > > had > > > > > > > > > > receive everything at least once. (Assuming there was no > race > > > > > > condition > > > > > > > > > > between compaction and writing to the file) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Has anybody got any thoughts? > > > > > > > > > > > > > > > > > > > > Will > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Will Funnell > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Will Funnell > > > > > > > > > > > > > > > -- > > Will Funnell > > -- Will Funnell