On Thu, Sep 3, 2015 at 11:45 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote:
> Hey Ewen, > > Thanks for the comments and they are really good questions. Please inline > replies. > > On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava <e...@confluent.io> > wrote: > > > A few questions: > > > > 1. If we update the producers to only support V1, doesn't that mean > people > > get stuck on the current version of the producer until they can be sure > all > > their consumers have been upgraded? Is that going to be a problem for > > anyone, and does it potentially keep important fixes/enhancements (e.g. > the > > upcoming client-side network request timeouts) because they have to wait > > for all the consumers to upgrade first? > > > This is a good point. I thought about this before, I and my initial > thinking is that we might need to add a config on producer to specify which > version you want to use to produce. But this seems to be a pretty ad-hoc > approach and I don't really like it. We are working on some general > protocol version control mechanism proposal and will have a separate KIP > for that. > The configs are annoying, but also can provide a way for us to guarantee a smooth upgrade for users that use the defaults and upgrade their entire infrastructure one version at a time: v0.8.3: no support for v1 v0.9.0: broker: accept.format=v0 (v1 is rejected) producer: produce.format=v0 consumer: include support for v0,v1 v0.9.1 broker: accept.format=v1 producer: produce.format=v1 consumer: v0,v1 > > > > 2. Why minute granularity specifically? That's seems fairly specific to > > LI's example workload (which, by the way, is a very useful reference > point > > to include in the KIP, thanks for that). If someone has the memory and > can > > support it, why not let them go all the way to per-record timestamps (by > > making the interval configurable and they can set it really small/to 0)? > It > > seems to me that some people might be willing to pay that cost for the > > application-side simplicity if they want to consume from specific > > timestamps. With the proposal as is, seeking to a timestamp still > requires > > client-side logic to filter out messages that might have earlier > timestamps > > due to the granularity chosen. The obvious tradeoff here is yet another > > config option -- I'm loath to add yet more configs, although this is one > > that we can choose a reasonable default for (like 1 minute) and people > can > > easily change with no impact if they need to adjust it. > > > The searching granularity will be actually millisecond. The index > granularity only determines how close you will be to the actually message > with the timestamp you are looking for. For example, if you are looking for > a message with timestamp 10:00:15, a minute granularity will give you the > offset at 10:00:00, and it needs to go through the records from 10:00:00 to > 10:00:15 to find the message. But with a second level granularity, it might > only need to go through the message produced in one second. So minute level > granularity index will take longer for search, but the precision will be > the same as second level index. That said, I am not objecting to adding the > granularity configuration but I am not sure how useful it would be to have > second level index because I think typically a consumer will be > long-running and only search for the timestamp at startup. > I will update the KIP page to clarify the precision. > > Ok, this makes sense. > > > > 3. Why not respect the timestamp passed in as long as it's not some > > sentinel value that indicates the broker should fill it in? When you do > > know they will be ordered, as they should be with mirror maker (which is > > specifically mentioned), this seems really useful. (More about this in > > questions below...) > > > Like what you mentioned in (4), having a log without monotonically > increasing timestamp is weird. To me it is even worse than having an empty > timestamp field in the inner message that will not be used except for log > compacted topic. I think the only way to solve this issue is to add another > CreateTime to the message. So far I am not sure how useful it is though > because arguably people can always put this timestamp in side the payload. > So I think this timestamp is more for server side usage instead of > application / client side usage. > I think setting it broker-side is fine, I just want to make sure user expectations are clear. The existing search-by-timestamp has obvious limitations. This proposal makes it much more accurate, so it needs to be clear who is responsible for assigning the timestamps and what they mean for an application. Applications that care about when the message was actually created will still need to do some additional work. > > > > > 4. I think one obvious answer to (3) is that accepting client's > timestamps > > could break seeking-by-timestamp since they may not be properly ordered. > > However, I think this can break anyway under normal operations -- any > > imperfections in clock synchronization could result in older timestamps > > being applied to new messages on a new leader broker compared messages > > stored previously by the old leader. I think it's probably too common to > > just think that NTP will take care of it and then run into weird bugs > > because NTP isn't always perfect, sometimes does some unexpected things, > > and, of course, does what you tell it to and so is subject to user error. > > This is a good point. Yes, NTP only guarantee limited synchronization > precision (several microseconds if you have low stratum and appropriate > PPS). My experience is that it actually is good and stable enough even for > some mission critical system such a core banking. Maybe this is more of an > implementation detail. The simple solution is that when leader append > messages to the log, it always take the max(lastAppendedTimestamp, > currentTimeMillis). Arguably we can play the same trick even if we let the > producer to fill in the timestamp. But that means the timestamp producer > set may ore may not be honored, which is a bit ugly. Also, if some producer > produced a super large timestamp by mistake, it will ruin timestamps of all > the messages. > > > > It would be reasonable to argue that the leader change issue is much less > > likely of an issue than if you respect timestamps from producers, where, > if > > applications actually filled it in, you'd receive a jumbled mess of > > timestamps and trying to do the binary search in the index wouldn't > > necessarily give you correct results. However, a) we could allow clients > to > > fill that info in but discourage it where it might cause issues (i.e. > > normal applications) and it seems like a significant win for mirrormaker. > > > > I actually think not accepting timestamps is probably the better choice > but > > a) it seems awkward in the protocol spec because we have to include the > > field in the produce requests since we don't want to have to fully decode > > them on the broker and b) losing that info during mirroring seems like it > > breaks the goal of fixing log retention (at least for mirrors) as well as > > the goal of improving searching by timestamp (at least for mirrors). > > > In terms of log retention, people might have different requirement for > cross cluster log retention. It looks simpler to me that we simply honor > the retention for each cluster instead of the entire pipeline. Can you > provide some use cases that cross-cluster retention is critical? > > I think the timestamp search might be more relevant for the mirroring case. It's doesn't seem great that you just lose the ability to do accurate time-based search because you mirror the data to another cluster. A hiccup in your mirror maker process would result in timestamps that might not even be close to the original timestamp. But this is also an artifact of mirror maker being a consumer + producer rather than integrated into brokers... > > > > 5. You never actually specified the granularity (or format, but I assume > > unix epoch) of the timestamp. Milliseconds? Microseconds? Nanoseconds? > This > > definitely needs to eventually make it into the protocol docs. > > > It should be millisecond. I'll add it to the KIP. > Does anyone have a use case for finer precision? Are we just going to assign the same timestamp for all messages in a produce request anyway? > > > > > 6. Re: the rejected alternative. Are there any other options in changing > > the config format that might make it a bit lighter weight? For example, > do > > we need a full int64? Could we do something relative instead that > wouldn't > > require as many bytes? Without (5) being specified, it's actually > difficult > > to evaluate some of these options. > > > Even if we use int32, it will still be a 50% memory footprint increase. I > just feel not worth spending so much memory on a very infrequent operation > especially when Kafka largely depends on the OS caching. > > If it's actually infrequent, then the index wouldn't be in cache anyway unless you had the memory to spare. And when you do run a search, you should only bring in a few pages that are hit by the binary search. So wouldn't you basically just be paying for the disk space, but not the memory under normal conditions? > > > > 7. Any plan to expose this in the client APIs? This is related to (4). If > > they are not exposed anywhere in the API as being associated with > messages, > > then we can reasonably treat them as an internal implementation detail > and > > be very clear about what looking up an offset for a timestamp means. If > > they are exposed, then they're more like message metadata that > applications > > are probably going to want preserved, and thus will want the broker to > > respect the timestamp. For example, if I saw that consumers could get the > > timestamp, I'd want to be able to assign it at the producer so that even > if > > I have significant batching I still get an accurate timestamp -- > basically > > I might replace some cases where I use a timestamp embedded in the > message > > with the timestamp provided by Kafka. (That said, we should consider the > > impact that including it in the protocol can have; non-Java clients are > > likely to expose it if it is available, whether it's actually a good idea > > to or not.) > > > Yes, searching by timestamp will be a Client API. Actually it is currently > a client API, OffsetRequest will search the offset by timestamp. While the > application timestamp is always important, the application level timestamp > can always be put into a payload. I think the timestamp in this case is > pretty much the same as offset, i.e. assigned by server but will be > provided to consumer for reference. > I actually meant on a per-message basis, i.e. in ConsumerRecord. Since you'll be receiving the timestamp on the consumer along with the message, applications might want access to it. -Ewen > > > > -Ewen > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> > > wrote: > > > > > Hi Guozhang, > > > > > > I checked the code again. Actually CRC check probably won't fail. The > > newly > > > added timestamp field might be treated as keyLength instead, so we are > > > likely to receive an IllegalArgumentException when try to read the key. > > > I'll update the KIP. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <j...@linkedin.com> > wrote: > > > > > > > Hi, Guozhang, > > > > > > > > Thanks for reading the KIP. By "old consumer", I meant the > > > > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. > > If > > > we > > > > fix the ZookeeperConsumerConnector then it will throw exception > > > complaining > > > > about the unsupported version when it sees message format V1. What I > > was > > > > trying to say is that if we have some ZookeeperConsumerConnector > > running > > > > without the fix, the consumer will complain about CRC mismatch > instead > > of > > > > unsupported version. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > >> Thanks for the write-up Jiangjie. > > > >> > > > >> One comment about migration plan: "For old consumers, if they see > the > > > new > > > >> protocol the CRC check will fail".. > > > >> > > > >> Do you mean this bug in the old consumer cannot be fixed in a > > > >> backward-compatible way? > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin > > <j...@linkedin.com.invalid > > > > > > > >> wrote: > > > >> > > > >> > Hi, > > > >> > > > > >> > We just created KIP-31 to propose a message format change in > Kafka. > > > >> > > > > >> > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > > > >> > > > > >> > As a summary, the motivations are: > > > >> > 1. Avoid server side message re-compression > > > >> > 2. Honor time-based log roll and retention > > > >> > 3. Enable offset search by timestamp at a finer granularity. > > > >> > > > > >> > Feedback and comments are welcome! > > > >> > > > > >> > Thanks, > > > >> > > > > >> > Jiangjie (Becket) Qin > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > > > > > > > -- > > Thanks, > > Ewen > > > -- Thanks, Ewen