Hey Ewen, Thanks for the comments and they are really good questions. Please inline replies.
On Thu, Sep 3, 2015 at 9:35 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > A few questions: > > 1. If we update the producers to only support V1, doesn't that mean people > get stuck on the current version of the producer until they can be sure all > their consumers have been upgraded? Is that going to be a problem for > anyone, and does it potentially keep important fixes/enhancements (e.g. the > upcoming client-side network request timeouts) because they have to wait > for all the consumers to upgrade first? > This is a good point. I thought about this before, I and my initial thinking is that we might need to add a config on producer to specify which version you want to use to produce. But this seems to be a pretty ad-hoc approach and I don't really like it. We are working on some general protocol version control mechanism proposal and will have a separate KIP for that. > > 2. Why minute granularity specifically? That's seems fairly specific to > LI's example workload (which, by the way, is a very useful reference point > to include in the KIP, thanks for that). If someone has the memory and can > support it, why not let them go all the way to per-record timestamps (by > making the interval configurable and they can set it really small/to 0)? It > seems to me that some people might be willing to pay that cost for the > application-side simplicity if they want to consume from specific > timestamps. With the proposal as is, seeking to a timestamp still requires > client-side logic to filter out messages that might have earlier timestamps > due to the granularity chosen. The obvious tradeoff here is yet another > config option -- I'm loath to add yet more configs, although this is one > that we can choose a reasonable default for (like 1 minute) and people can > easily change with no impact if they need to adjust it. > The searching granularity will be actually millisecond. The index granularity only determines how close you will be to the actually message with the timestamp you are looking for. For example, if you are looking for a message with timestamp 10:00:15, a minute granularity will give you the offset at 10:00:00, and it needs to go through the records from 10:00:00 to 10:00:15 to find the message. But with a second level granularity, it might only need to go through the message produced in one second. So minute level granularity index will take longer for search, but the precision will be the same as second level index. That said, I am not objecting to adding the granularity configuration but I am not sure how useful it would be to have second level index because I think typically a consumer will be long-running and only search for the timestamp at startup. I will update the KIP page to clarify the precision. > 3. Why not respect the timestamp passed in as long as it's not some > sentinel value that indicates the broker should fill it in? When you do > know they will be ordered, as they should be with mirror maker (which is > specifically mentioned), this seems really useful. (More about this in > questions below...) > Like what you mentioned in (4), having a log without monotonically increasing timestamp is weird. To me it is even worse than having an empty timestamp field in the inner message that will not be used except for log compacted topic. I think the only way to solve this issue is to add another CreateTime to the message. So far I am not sure how useful it is though because arguably people can always put this timestamp in side the payload. So I think this timestamp is more for server side usage instead of application / client side usage. > > 4. I think one obvious answer to (3) is that accepting client's timestamps > could break seeking-by-timestamp since they may not be properly ordered. > However, I think this can break anyway under normal operations -- any > imperfections in clock synchronization could result in older timestamps > being applied to new messages on a new leader broker compared messages > stored previously by the old leader. I think it's probably too common to > just think that NTP will take care of it and then run into weird bugs > because NTP isn't always perfect, sometimes does some unexpected things, > and, of course, does what you tell it to and so is subject to user error. This is a good point. Yes, NTP only guarantee limited synchronization precision (several microseconds if you have low stratum and appropriate PPS). My experience is that it actually is good and stable enough even for some mission critical system such a core banking. Maybe this is more of an implementation detail. The simple solution is that when leader append messages to the log, it always take the max(lastAppendedTimestamp, currentTimeMillis). Arguably we can play the same trick even if we let the producer to fill in the timestamp. But that means the timestamp producer set may ore may not be honored, which is a bit ugly. Also, if some producer produced a super large timestamp by mistake, it will ruin timestamps of all the messages. > > It would be reasonable to argue that the leader change issue is much less > likely of an issue than if you respect timestamps from producers, where, if > applications actually filled it in, you'd receive a jumbled mess of > timestamps and trying to do the binary search in the index wouldn't > necessarily give you correct results. However, a) we could allow clients to > fill that info in but discourage it where it might cause issues (i.e. > normal applications) and it seems like a significant win for mirrormaker. > I actually think not accepting timestamps is probably the better choice but > a) it seems awkward in the protocol spec because we have to include the > field in the produce requests since we don't want to have to fully decode > them on the broker and b) losing that info during mirroring seems like it > breaks the goal of fixing log retention (at least for mirrors) as well as > the goal of improving searching by timestamp (at least for mirrors). > In terms of log retention, people might have different requirement for cross cluster log retention. It looks simpler to me that we simply honor the retention for each cluster instead of the entire pipeline. Can you provide some use cases that cross-cluster retention is critical? > > 5. You never actually specified the granularity (or format, but I assume > unix epoch) of the timestamp. Milliseconds? Microseconds? Nanoseconds? This > definitely needs to eventually make it into the protocol docs. > It should be millisecond. I'll add it to the KIP. > > 6. Re: the rejected alternative. Are there any other options in changing > the config format that might make it a bit lighter weight? For example, do > we need a full int64? Could we do something relative instead that wouldn't > require as many bytes? Without (5) being specified, it's actually difficult > to evaluate some of these options. > Even if we use int32, it will still be a 50% memory footprint increase. I just feel not worth spending so much memory on a very infrequent operation especially when Kafka largely depends on the OS caching. > > 7. Any plan to expose this in the client APIs? This is related to (4). If > they are not exposed anywhere in the API as being associated with messages, > then we can reasonably treat them as an internal implementation detail and > be very clear about what looking up an offset for a timestamp means. If > they are exposed, then they're more like message metadata that applications > are probably going to want preserved, and thus will want the broker to > respect the timestamp. For example, if I saw that consumers could get the > timestamp, I'd want to be able to assign it at the producer so that even if > I have significant batching I still get an accurate timestamp -- basically > I might replace some cases where I use a timestamp embedded in the message > with the timestamp provided by Kafka. (That said, we should consider the > impact that including it in the protocol can have; non-Java clients are > likely to expose it if it is available, whether it's actually a good idea > to or not.) > Yes, searching by timestamp will be a Client API. Actually it is currently a client API, OffsetRequest will search the offset by timestamp. While the application timestamp is always important, the application level timestamp can always be put into a payload. I think the timestamp in this case is pretty much the same as offset, i.e. assigned by server but will be provided to consumer for reference. > > -Ewen > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > > > Hi Guozhang, > > > > I checked the code again. Actually CRC check probably won't fail. The > newly > > added timestamp field might be treated as keyLength instead, so we are > > likely to receive an IllegalArgumentException when try to read the key. > > I'll update the KIP. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin <j...@linkedin.com> wrote: > > > > > Hi, Guozhang, > > > > > > Thanks for reading the KIP. By "old consumer", I meant the > > > ZookeeperConsumerConnector in trunk now, i.e. without this bug fixed. > If > > we > > > fix the ZookeeperConsumerConnector then it will throw exception > > complaining > > > about the unsupported version when it sees message format V1. What I > was > > > trying to say is that if we have some ZookeeperConsumerConnector > running > > > without the fix, the consumer will complain about CRC mismatch instead > of > > > unsupported version. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Thanks for the write-up Jiangjie. > > >> > > >> One comment about migration plan: "For old consumers, if they see the > > new > > >> protocol the CRC check will fail".. > > >> > > >> Do you mean this bug in the old consumer cannot be fixed in a > > >> backward-compatible way? > > >> > > >> Guozhang > > >> > > >> > > >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin > <j...@linkedin.com.invalid > > > > > >> wrote: > > >> > > >> > Hi, > > >> > > > >> > We just created KIP-31 to propose a message format change in Kafka. > > >> > > > >> > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > > >> > > > >> > As a summary, the motivations are: > > >> > 1. Avoid server side message re-compression > > >> > 2. Honor time-based log roll and retention > > >> > 3. Enable offset search by timestamp at a finer granularity. > > >> > > > >> > Feedback and comments are welcome! > > >> > > > >> > Thanks, > > >> > > > >> > Jiangjie (Becket) Qin > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > > > -- > Thanks, > Ewen >