Hi, Any update on the above patch?
Hoping you might be able to review it soon. Thanks. On 23 February 2015 at 21:21, Will Funnell <w.f.funn...@gmail.com> wrote: > Hey guys, > > I created a patch based on your feedback. > > Let me know what you think. > > https://issues.apache.org/jira/browse/KAFKA-1977 > > On 20 February 2015 at 01:43, Joel Koshy <jjkosh...@gmail.com> wrote: > >> The log end offset (of a partition) changes when messages are appended >> to the partition. (It is not correlated with the consumer's offset). >> >> >> On Thu, Feb 19, 2015 at 08:58:10PM +0000, Will Funnell wrote: >> > So at what point does the log end offset change? When you commit? >> > >> > On 19 February 2015 at 18:47, Joel Koshy <jjkosh...@gmail.com> wrote: >> > >> > > > If I consumed up to the log end offset and log compaction happens in >> > > > between, I would have missed some messages. >> > > >> > > Compaction actually only runs on the rolled over segments (not the >> > > active - i.e., latest segment). The log-end-offset will be in the >> > > latest segment which does not participate in compaction. >> > > >> > > > > The log end offset is just the end of the committed messages in >> the log >> > > > > (the last thing the consumer has access to). It isn't the same as >> the >> > > > > cleaner point but is always later than it so it would work just as >> > > well. >> > > > >> > > > Isn't this just roughly the same value as using c.getOffsetsBefore() >> > > with a >> > > > partitionRequestTime of -1? >> > > > >> > > > >> > > > Although its always later than the cleaner point, surely log >> compaction >> > > is >> > > > still an issue here. >> > > > >> > > > If I consumed up to the log end offset and log compaction happens in >> > > > between, I would have missed some messages. >> > > > >> > > > >> > > > My thinking was that if you knew the log cleaner point, you could: >> > > > >> > > > Make a note of the starting offset >> > > > Consume till end of log >> > > > Check my starting point is ahead of current cleaner point, otherwise >> > > loop. >> > > > >> > > > >> > > > I appreciate there is a chance I misunderstood your point. >> > > > >> > > > On 19 February 2015 at 18:02, Jay Kreps <jay.kr...@gmail.com> >> wrote: >> > > > >> > > > > The log end offset is just the end of the committed messages in >> the log >> > > > > (the last thing the consumer has access to). It isn't the same as >> the >> > > > > cleaner point but is always later than it so it would work just as >> > > well. >> > > > > >> > > > > -Jay >> > > > > >> > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell < >> w.f.funn...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think >> it is >> > > > > > > along the lines of: we expose the log-end-offset (actually >> the high >> > > > > > > watermark) of the partition in the fetch response. However, >> this is >> > > > > > > not exposed to the consumer (either in the new ConsumerRecord >> class >> > > > > > > or the existing MessageAndMetadata class). If we did, then if >> you >> > > > > > > were to consume a record you can check that it has offsets up >> to >> > > the >> > > > > > > log-end offset. If it does then you would know for sure that >> you >> > > have >> > > > > > > consumed everything for that partition >> > > > > > >> > > > > > To confirm then, the log-end-offset is the same as the cleaner >> point? >> > > > > > >> > > > > > >> > > > > > >> > > > > > On 19 February 2015 at 03:10, Jay Kreps <jay.kr...@gmail.com> >> wrote: >> > > > > > >> > > > > > > Yeah I was thinking either along the lines Joel was >> suggesting or >> > > else >> > > > > > > adding a logEndOffset(TopicPartition) method or something like >> > > that. As >> > > > > > > Joel says the consumer actually has this information >> internally (we >> > > > > > return >> > > > > > > it with the fetch request) but doesn't expose it. >> > > > > > > >> > > > > > > -Jay >> > > > > > > >> > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy < >> jjkosh...@gmail.com> >> > > > > wrote: >> > > > > > > >> > > > > > > > > > 2. Make the log end offset available more easily in the >> > > consumer. >> > > > > > > > > >> > > > > > > > > Was thinking something would need to be added in >> > > LogCleanerManager, >> > > > > > in >> > > > > > > > the >> > > > > > > > > updateCheckpoints function. Where would be best to >> publish the >> > > > > > > > information >> > > > > > > > > to make it more easily available, or would you just >> expose the >> > > > > > > > > offset-cleaner-checkpoint file as it is? >> > > > > > > > > Is it right you would also need to know which >> > > > > > offset-cleaner-checkpoint >> > > > > > > > > entry related to each active partition? >> > > > > > > > >> > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I >> think it >> > > is >> > > > > > > > along the lines of: we expose the log-end-offset (actually >> the >> > > high >> > > > > > > > watermark) of the partition in the fetch response. However, >> this >> > > is >> > > > > > > > not exposed to the consumer (either in the new >> ConsumerRecord >> > > class >> > > > > > > > or the existing MessageAndMetadata class). If we did, then >> if you >> > > > > > > > were to consume a record you can check that it has offsets >> up to >> > > the >> > > > > > > > log-end offset. If it does then you would know for sure >> that you >> > > have >> > > > > > > > consumed everything for that partition. >> > > > > > > > >> > > > > > > > > Yes, was looking at this initially, but as we have 100-150 >> > > writes >> > > > > per >> > > > > > > > > second, it could be a while before there is a pause long >> > > enough to >> > > > > > > check >> > > > > > > > it >> > > > > > > > > has caught up. Even with the consumer timeout set to -1, >> it >> > > takes >> > > > > > some >> > > > > > > > time >> > > > > > > > > to query the max offset values, which is still long >> enough for >> > > more >> > > > > > > > > messages to arrive. >> > > > > > > > >> > > > > > > > Got it - thanks for clarifying. >> > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy < >> jjkosh...@gmail.com> >> > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > > You are also correct and perceptive to notice that if >> you >> > > check >> > > > > > the >> > > > > > > > end >> > > > > > > > > > of >> > > > > > > > > > > the log then begin consuming and read up to that point >> > > > > compaction >> > > > > > > may >> > > > > > > > > > have >> > > > > > > > > > > already kicked in (if the reading takes a while) and >> hence >> > > you >> > > > > > > might >> > > > > > > > have >> > > > > > > > > > > an incomplete snapshot. >> > > > > > > > > > >> > > > > > > > > > Isn't it sufficient to just repeat the check at the end >> after >> > > > > > reading >> > > > > > > > > > the log and repeat until you are truly done? At least >> for the >> > > > > > > purposes >> > > > > > > > > > of a snapshot? >> > > > > > > > > > >> > > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps >> wrote: >> > > > > > > > > > > If you catch up off a compacted topic and keep >> consuming >> > > then >> > > > > you >> > > > > > > > will >> > > > > > > > > > > become consistent with the log. >> > > > > > > > > > > >> > > > > > > > > > > I think what you are saying is that you want to >> create a >> > > > > snapshot >> > > > > > > > from >> > > > > > > > > > the >> > > > > > > > > > > Kafka topic but NOT do continual reads after that >> point. >> > > For >> > > > > > > example >> > > > > > > > you >> > > > > > > > > > > might be creating a backup of the data to a file. >> > > > > > > > > > > >> > > > > > > > > > > I agree that this isn't as easy as it could be. As >> you say >> > > the >> > > > > > only >> > > > > > > > > > > solution we have is that timeout which doesn't >> > > differentiate >> > > > > > > between >> > > > > > > > GC >> > > > > > > > > > > stall in your process and no more messages left so you >> > > would >> > > > > need >> > > > > > > to >> > > > > > > > tune >> > > > > > > > > > > the timeout. This is admittedly kind of a hack. >> > > > > > > > > > > >> > > > > > > > > > > You are also correct and perceptive to notice that if >> you >> > > check >> > > > > > the >> > > > > > > > end >> > > > > > > > > > of >> > > > > > > > > > > the log then begin consuming and read up to that point >> > > > > compaction >> > > > > > > may >> > > > > > > > > > have >> > > > > > > > > > > already kicked in (if the reading takes a while) and >> hence >> > > you >> > > > > > > might >> > > > > > > > have >> > > > > > > > > > > an incomplete snapshot. >> > > > > > > > > > > >> > > > > > > > > > > I think there are two features we could add that >> would make >> > > > > this >> > > > > > > > easier: >> > > > > > > > > > > 1. Make the cleaner point configurable on a per-topic >> > > basis. >> > > > > This >> > > > > > > > feature >> > > > > > > > > > > would allow you to control how long the full log is >> > > retained >> > > > > and >> > > > > > > when >> > > > > > > > > > > compaction can kick in. This would give a >> configurable SLA >> > > for >> > > > > > the >> > > > > > > > reader >> > > > > > > > > > > process to catch up. >> > > > > > > > > > > 2. Make the log end offset available more easily in >> the >> > > > > consumer. >> > > > > > > > > > > >> > > > > > > > > > > -Jay >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell < >> > > > > > > > w.f.funn...@gmail.com> >> > > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log >> compaction >> > > in >> > > > > > order >> > > > > > > > to >> > > > > > > > > > > > provide streams of messages to our clients. >> > > > > > > > > > > > >> > > > > > > > > > > > As well as constantly consuming the stream, one of >> our >> > > use >> > > > > > cases >> > > > > > > > is to >> > > > > > > > > > > > provide a snapshot, meaning the user will receive a >> copy >> > > of >> > > > > > every >> > > > > > > > > > message >> > > > > > > > > > > > at least once. >> > > > > > > > > > > > >> > > > > > > > > > > > Each one of these messages represents an item of >> content >> > > in >> > > > > our >> > > > > > > > system. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > The problem comes when determining if the client has >> > > actually >> > > > > > > > reached >> > > > > > > > > > the >> > > > > > > > > > > > end of the topic. >> > > > > > > > > > > > >> > > > > > > > > > > > The standard Kafka way of dealing with this seems >> to be >> > > by >> > > > > > using >> > > > > > > a >> > > > > > > > > > > > ConsumerTimeoutException, but we are frequently >> getting >> > > this >> > > > > > > error >> > > > > > > > > > when the >> > > > > > > > > > > > end of the topic has not been reached or even it may >> > > take a >> > > > > > long >> > > > > > > > time >> > > > > > > > > > > > before a timeout naturally occurs. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On first glance it would seem possible to do a >> lookup >> > > for the >> > > > > > max >> > > > > > > > > > offset >> > > > > > > > > > > > for each partition when you begin consuming, >> stopping >> > > when >> > > > > this >> > > > > > > > > > position it >> > > > > > > > > > > > reached. >> > > > > > > > > > > > >> > > > > > > > > > > > But log compaction means that if an update to a >> piece of >> > > > > > content >> > > > > > > > > > arrives >> > > > > > > > > > > > with the same message key, then this will be >> written to >> > > the >> > > > > end >> > > > > > > so >> > > > > > > > the >> > > > > > > > > > > > snapshot will be incomplete. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > Another thought is to make use of the cleaner point. >> > > > > Currently >> > > > > > > > Kafka >> > > > > > > > > > writes >> > > > > > > > > > > > out to a "cleaner-offset-checkpoint" file in each >> data >> > > > > > directory >> > > > > > > > which >> > > > > > > > > > is >> > > > > > > > > > > > written to after log compaction completes. >> > > > > > > > > > > > >> > > > > > > > > > > > If the consumer was able to access the >> > > > > > cleaner-offset-checkpoint >> > > > > > > > you >> > > > > > > > > > would >> > > > > > > > > > > > be able to consume up to this point, check the >> point was >> > > > > still >> > > > > > > the >> > > > > > > > > > same, >> > > > > > > > > > > > and compaction had not yet occurred, and therefore >> > > determine >> > > > > > you >> > > > > > > > had >> > > > > > > > > > > > receive everything at least once. (Assuming there >> was no >> > > race >> > > > > > > > condition >> > > > > > > > > > > > between compaction and writing to the file) >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > Has anybody got any thoughts? >> > > > > > > > > > > > >> > > > > > > > > > > > Will >> > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > -- >> > > > > > > > > Will Funnell >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > Will Funnell >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > Will Funnell >> > > >> > > >> > >> > >> > -- >> > Will Funnell >> >> > > > -- > Will Funnell > -- Will Funnell