I just wanted to comment on a few points made earlier in this thread: Concerns on clock skew: at least for the original proposal's scope (which was more for honoring retention broker-side) this would only be an issue when spanning leader movements right? i.e., leader migration latency has to be much less than clock skew for this to be a real issue wouldn’t it?
Client timestamp vs broker timestamp: I’m not sure Kafka (brokers) are the right place to reason about client-side timestamps precisely due to the nuances that have been discussed at length in this thread. My preference would have been to the timestamp (now called LogAppendTimestamp) have nothing to do with the applications. Ewen raised a valid concern about leaking such “private/server-side” timestamps into the protocol spec. i.e., it is fine to have the CreateTime which is expressly client-provided and immutable thereafter, but the LogAppendTime is also going part of the protocol and it would be good to avoid exposure (to client developers) if possible. Ok, so here is a slightly different approach that I was just thinking about (and did not think too far so it may not work): do not add the LogAppendTime to messages. Instead, build the time-based index on the server side on message arrival time alone. Introduce a new ReplicaFetchRequest/Response pair. ReplicaFetchResponses will also include the slice of the time-based index for the follower broker. This way we can at least keep timestamps aligned across brokers for retention purposes. We do lose the append timestamp for mirroring pipelines (which appears to be the case in KIP-32 as well). Configurable index granularity: We can do this but I’m not sure it is very useful and as Jay noted, a major change from the old proposal linked from the KIP is the sparse time-based index which we felt was essential to bound memory usage (and having timestamps on each log index entry was probably a big waste since in the common case several messages span the same timestamp). BTW another benefit of the second index is that it makes it easier to roll-back or throw away if necessary (vs. modifying the existing index format) - although that obviously does not help with rolling back the timestamp change in the message format, but it is one less thing to worry about. Versioning: I’m not sure everyone is saying the same thing wrt the scope of this. There is the record format change, but I also think this ties into all of the API versioning that we already have in Kafka. The current API versioning approach works fine for upgrades/downgrades across official Kafka releases, but not so well between releases. (We almost got bitten by this at LinkedIn with the recent changes to various requests but were able to work around these.) We can clarify this in the follow-up KIP. Thanks, Joel On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi Jay, > > I just changed the KIP title and updated the KIP page. > > And yes, we are working on a general version control proposal to make the > protocol migration like this more smooth. I will also create a KIP for that > soon. > > Thanks, > > Jiangjie (Becket) Qin > > > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps <j...@confluent.io> wrote: > >> Great, can we change the name to something related to the change--"KIP-31: >> Move to relative offsets in compressed message sets". >> >> Also you had mentioned before you were going to expand on the mechanics of >> handling these log format changes, right? >> >> -Jay >> >> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin <j...@linkedin.com.invalid> >> wrote: >> >> > Neha and Jay, >> > >> > Thanks a lot for the feedback. Good point about splitting the >> discussion. I >> > have split the proposal to three KIPs and it does make each discussion >> more >> > clear: >> > KIP-31 - Message format change (Use relative offset) >> > KIP-32 - Add CreateTime and LogAppendTime to Kafka message >> > KIP-33 - Build a time-based log index >> > >> > KIP-33 can be a follow up KIP for KIP-32, so we can discuss about KIP-31 >> > and KIP-32 first for now. I will create a separate discussion thread for >> > KIP-32 and reply the concerns you raised regarding the timestamp. >> > >> > So far it looks there is no objection to KIP-31. Since I removed a few >> part >> > from previous KIP and only left the relative offset proposal, it would be >> > great if people can take another look to see if there is any concerns. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede <n...@confluent.io> wrote: >> > >> > > Becket, >> > > >> > > Nice write-up. Few thoughts - >> > > >> > > I'd split up the discussion for simplicity. Note that you can always >> > group >> > > several of these in one patch to reduce the protocol changes people >> have >> > to >> > > deal with.This is just a suggestion, but I think the following split >> > might >> > > make it easier to tackle the changes being proposed - >> > > >> > > - Relative offsets >> > > - Introducing the concept of time >> > > - Time-based indexing (separate the usage of the timestamp field >> from >> > > how/whether we want to include a timestamp in the message) >> > > >> > > I'm a +1 on relative offsets, we should've done it back when we >> > introduced >> > > it. Other than reducing the CPU overhead, this will also reduce the >> > garbage >> > > collection overhead on the brokers. >> > > >> > > On the timestamp field, I generally agree that we should add a >> timestamp >> > to >> > > a Kafka message but I'm not quite sold on how this KIP suggests the >> > > timestamp be set. Will avoid repeating the downsides of a broker side >> > > timestamp mentioned previously in this thread. I think the topic of >> > > including a timestamp in a Kafka message requires a lot more thought >> and >> > > details than what's in this KIP. I'd suggest we make it a separate KIP >> > that >> > > includes a list of all the different use cases for the timestamp >> (beyond >> > > log retention) including stream processing and discuss tradeoffs of >> > > including client and broker side timestamps. >> > > >> > > Agree with the benefit of time-based indexing, but haven't had a chance >> > to >> > > dive into the design details yet. >> > > >> > > Thanks, >> > > Neha >> > > >> > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote: >> > > >> > > > Hey Beckett, >> > > > >> > > > I was proposing splitting up the KIP just for simplicity of >> discussion. >> > > You >> > > > can still implement them in one patch. I think otherwise it will be >> > hard >> > > to >> > > > discuss/vote on them since if you like the offset proposal but not >> the >> > > time >> > > > proposal what do you do? >> > > > >> > > > Introducing a second notion of time into Kafka is a pretty massive >> > > > philosophical change so it kind of warrants it's own KIP I think it >> > isn't >> > > > just "Change message format". >> > > > >> > > > WRT time I think one thing to clarify in the proposal is how MM will >> > have >> > > > access to set the timestamp? Presumably this will be a new field in >> > > > ProducerRecord, right? If so then any user can set the timestamp, >> > right? >> > > > I'm not sure you answered the questions around how this will work for >> > MM >> > > > since when MM retains timestamps from multiple partitions they will >> > then >> > > be >> > > > out of order and in the past (so the max(lastAppendedTimestamp, >> > > > currentTimeMillis) override you proposed will not work, right?). If >> we >> > > > don't do this then when you set up mirroring the data will all be new >> > and >> > > > you have the same retention problem you described. Maybe I missed >> > > > something...? >> > > > >> > > > My main motivation is that given that both Samza and Kafka streams >> are >> > > > doing work that implies a mandatory client-defined notion of time, I >> > > really >> > > > think introducing a different mandatory notion of time in Kafka is >> > going >> > > to >> > > > be quite odd. We should think hard about how client-defined time >> could >> > > > work. I'm not sure if it can, but I'm also not sure that it can't. >> > Having >> > > > both will be odd. Did you chat about this with Yi/Kartik on the Samza >> > > side? >> > > > >> > > > When you are saying it won't work you are assuming some particular >> > > > implementation? Maybe that the index is a monotonically increasing >> set >> > of >> > > > pointers to the least record with a timestamp larger than the index >> > time? >> > > > In other words a search for time X gives the largest offset at which >> > all >> > > > records are <= X? >> > > > >> > > > For retention, I agree with the problem you point out, but I think >> what >> > > you >> > > > are saying in that case is that you want a size limit too. If you use >> > > > system time you actually hit the same problem: say you do a full dump >> > of >> > > a >> > > > DB table with a setting of 7 days retention, your retention will >> > actually >> > > > not get enforced for the first 7 days because the data is "new to >> > Kafka". >> > > > >> > > > -Jay >> > > > >> > > > >> > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin >> > <j...@linkedin.com.invalid >> > > > >> > > > wrote: >> > > > >> > > > > Jay, >> > > > > >> > > > > Thanks for the comments. Yes, there are actually three proposals as >> > you >> > > > > pointed out. >> > > > > >> > > > > We will have a separate proposal for (1) - version control >> mechanism. >> > > We >> > > > > actually thought about whether we want to separate 2 and 3 >> internally >> > > > > before creating the KIP. The reason we put 2 and 3 together is it >> > will >> > > > > saves us another cross board wire protocol change. Like you said, >> we >> > > have >> > > > > to migrate all the clients in all languages. To some extent, the >> > effort >> > > > to >> > > > > spend on upgrading the clients can be even bigger than implementing >> > the >> > > > new >> > > > > feature itself. So there are some attractions if we can do 2 and 3 >> > > > together >> > > > > instead of separately. Maybe after (1) is done it will be easier to >> > do >> > > > > protocol migration. But if we are able to come to an agreement on >> the >> > > > > timestamp solution, I would prefer to have it together with >> relative >> > > > offset >> > > > > in the interest of avoiding another wire protocol change (the >> process >> > > to >> > > > > migrate to relative offset is exactly the same as migrate to >> message >> > > with >> > > > > timestamp). >> > > > > >> > > > > In terms of timestamp. I completely agree that having client >> > timestamp >> > > is >> > > > > more useful if we can make sure the timestamp is good. But in >> reality >> > > > that >> > > > > can be a really big *IF*. I think the problem is exactly as Ewen >> > > > mentioned, >> > > > > if we let the client to set the timestamp, it would be very hard >> for >> > > the >> > > > > broker to utilize it. If broker apply retention policy based on the >> > > > client >> > > > > timestamp. One misbehave producer can potentially completely mess >> up >> > > the >> > > > > retention policy on the broker. Although people don't care about >> > server >> > > > > side timestamp. People do care a lot when timestamp breaks. >> Searching >> > > by >> > > > > timestamp is a really important use case even though it is not used >> > as >> > > > > often as searching by offset. It has significant direct impact on >> RTO >> > > > when >> > > > > there is a cross cluster failover as Todd mentioned. >> > > > > >> > > > > The trick using max(lastAppendedTimestamp, currentTimeMillis) is to >> > > > > guarantee monotonic increase of the timestamp. Many commercial >> system >> > > > > actually do something similar to this to solve the time skew. About >> > > > > changing the time, I am not sure if people use NTP like using a >> watch >> > > to >> > > > > just set it forward/backward by an hour or so. The time adjustment >> I >> > > used >> > > > > to do is typically to adjust something like a minute / week. So >> for >> > > each >> > > > > second, there might be a few mircoseconds slower/faster but should >> > not >> > > > > break the clock completely to make sure all the time-based >> > transactions >> > > > are >> > > > > not affected. The one minute change will be done within a week but >> > not >> > > > > instantly. >> > > > > >> > > > > Personally, I think having client side timestamp will be useful if >> we >> > > > don't >> > > > > need to put the broker and data integrity under risk. If we have to >> > > > choose >> > > > > from one of them but not both. I would prefer server side timestamp >> > > > because >> > > > > for client side timestamp there is always a plan B which is putting >> > the >> > > > > timestamp into payload. >> > > > > >> > > > > Another reason I am reluctant to use the client side timestamp is >> > that >> > > it >> > > > > is always dangerous to mix the control plane with data plane. IP >> did >> > > this >> > > > > and it has caused so many different breaches so people are >> migrating >> > to >> > > > > something like MPLS. An example in Kafka is that any client can >> > > > construct a >> > > > > LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest >> > > (you >> > > > > name it) and send it to the broker to mess up the entire cluster, >> > also >> > > as >> > > > > we already noticed a busy cluster can respond quite slow to >> > controller >> > > > > messages. So it would really be nice if we can avoid giving the >> power >> > > to >> > > > > clients to control the log retention. >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jiangjie (Becket) Qin >> > > > > >> > > > > >> > > > > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino <tpal...@gmail.com> >> > wrote: >> > > > > >> > > > > > So, with regards to why you want to search by timestamp, the >> > biggest >> > > > > > problem I've seen is with consumers who want to reset their >> > > timestamps >> > > > > to a >> > > > > > specific point, whether it is to replay a certain amount of >> > messages, >> > > > or >> > > > > to >> > > > > > rewind to before some problem state existed. This happens more >> > often >> > > > than >> > > > > > anyone would like. >> > > > > > >> > > > > > To handle this now we need to constantly export the broker's >> offset >> > > for >> > > > > > every partition to a time-series database and then use external >> > > > processes >> > > > > > to query this. I know we're not the only ones doing this. The way >> > the >> > > > > > broker handles requests for offsets by timestamp is a little >> obtuse >> > > > > > (explain it to anyone without intimate knowledge of the internal >> > > > workings >> > > > > > of the broker - every time I do I see this). In addition, as >> Becket >> > > > > pointed >> > > > > > out, it causes problems specifically with retention of messages >> by >> > > time >> > > > > > when you move partitions around. >> > > > > > >> > > > > > I'm deliberately avoiding the discussion of what timestamp to >> use. >> > I >> > > > can >> > > > > > see the argument either way, though I tend to lean towards the >> idea >> > > > that >> > > > > > the broker timestamp is the only viable source of truth in this >> > > > > situation. >> > > > > > >> > > > > > -Todd >> > > > > > >> > > > > > >> > > > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen Cheslack-Postava < >> > > > e...@confluent.io >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps <j...@confluent.io> >> > > wrote: >> > > > > > > >> > > > > > > > >> > > > > > > > 2. Nobody cares what time it is on the server. >> > > > > > > > >> > > > > > > >> > > > > > > This is a good way of summarizing the issue I was trying to get >> > at, >> > > > > from >> > > > > > an >> > > > > > > app's perspective. Of the 3 stated goals of the KIP, #2 (lot >> > > > retention) >> > > > > > is >> > > > > > > reasonably handled by a server-side timestamp. I really just >> care >> > > > that >> > > > > a >> > > > > > > message is there long enough that I have a chance to process >> it. >> > #3 >> > > > > > > (searching by timestamp) only seems useful if we can guarantee >> > the >> > > > > > > server-side timestamp is close enough to the original >> client-side >> > > > > > > timestamp, and any mirror maker step seems to break that (even >> > > > ignoring >> > > > > > any >> > > > > > > issues with broker availability). >> > > > > > > >> > > > > > > I'm also wondering whether optimizing for search-by-timestamp >> on >> > > the >> > > > > > broker >> > > > > > > is really something we want to do given that messages aren't >> > really >> > > > > > > guaranteed to be ordered by application-level timestamps on the >> > > > broker. >> > > > > > Is >> > > > > > > part of the need for this just due to the current consumer APIs >> > > being >> > > > > > > difficult to work with? For example, could you implement this >> > > pretty >> > > > > > easily >> > > > > > > client side just the way you would broker-side? I'd imagine a >> > > couple >> > > > of >> > > > > > > random seeks + reads during very rare occasions (i.e. when the >> > app >> > > > > starts >> > > > > > > up) wouldn't be a problem performance-wise. Or is it also that >> > you >> > > > need >> > > > > > the >> > > > > > > broker to enforce things like monotonically increasing >> timestamps >> > > > since >> > > > > > you >> > > > > > > can't do the query properly and efficiently without that >> > guarantee, >> > > > and >> > > > > > > therefore what applications are actually looking for *is* >> > > broker-side >> > > > > > > timestamps? >> > > > > > > >> > > > > > > -Ewen >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > Consider cases where data is being copied from a database or >> > from >> > > > log >> > > > > > > > files. In steady-state the server time is very close to the >> > > client >> > > > > time >> > > > > > > if >> > > > > > > > their clocks are sync'd (see 1) but there will be times of >> > large >> > > > > > > divergence >> > > > > > > > when the copying process is stopped or falls behind. When >> this >> > > > occurs >> > > > > > it >> > > > > > > is >> > > > > > > > clear that the time the data arrived on the server is >> > irrelevant, >> > > > it >> > > > > is >> > > > > > > the >> > > > > > > > source timestamp that matters. This is the problem you are >> > trying >> > > > to >> > > > > > fix >> > > > > > > by >> > > > > > > > retaining the mm timestamp but really the client should >> always >> > > set >> > > > > the >> > > > > > > time >> > > > > > > > with the use of server-side time as a fallback. It would be >> > worth >> > > > > > talking >> > > > > > > > to the Samza folks and reading through this blog post ( >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html >> > > > > > > > ) >> > > > > > > > on this subject since we went through similar learnings on >> the >> > > > stream >> > > > > > > > processing side. >> > > > > > > > >> > > > > > > > I think the implication of these two is that we need a >> proposal >> > > > that >> > > > > > > > handles potentially very out-of-order timestamps in some kind >> > of >> > > > > sanish >> > > > > > > way >> > > > > > > > (buggy clients will set something totally wrong as the time). >> > > > > > > > >> > > > > > > > -Jay >> > > > > > > > >> > > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps <j...@confluent.io> >> > > > wrote: >> > > > > > > > >> > > > > > > > > The magic byte is used to version message format so we'll >> > need >> > > to >> > > > > > make >> > > > > > > > > sure that check is in place--I actually don't see it in the >> > > > current >> > > > > > > > > consumer code which I think is a bug we should fix for the >> > next >> > > > > > release >> > > > > > > > > (filed KAFKA-2523). The purpose of that field is so there >> is >> > a >> > > > > clear >> > > > > > > > check >> > > > > > > > > on the format rather than the scrambled scenarios Becket >> > > > describes. >> > > > > > > > > >> > > > > > > > > Also, Becket, I don't think just fixing the java client is >> > > > > sufficient >> > > > > > > as >> > > > > > > > > that would break other clients--i.e. if anyone writes a v1 >> > > > > messages, >> > > > > > > even >> > > > > > > > > by accident, any non-v1-capable consumer will break. I >> think >> > we >> > > > > > > probably >> > > > > > > > > need a way to have the server ensure a particular message >> > > format >> > > > > > either >> > > > > > > > at >> > > > > > > > > read or write time. >> > > > > > > > > >> > > > > > > > > -Jay >> > > > > > > > > >> > > > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin >> > > > > > <j...@linkedin.com.invalid >> > > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > >> Hi Guozhang, >> > > > > > > > >> >> > > > > > > > >> I checked the code again. Actually CRC check probably >> won't >> > > > fail. >> > > > > > The >> > > > > > > > >> newly >> > > > > > > > >> added timestamp field might be treated as keyLength >> instead, >> > > so >> > > > we >> > > > > > are >> > > > > > > > >> likely to receive an IllegalArgumentException when try to >> > read >> > > > the >> > > > > > > key. >> > > > > > > > >> I'll update the KIP. >> > > > > > > > >> >> > > > > > > > >> Thanks, >> > > > > > > > >> >> > > > > > > > >> Jiangjie (Becket) Qin >> > > > > > > > >> >> > > > > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie Qin < >> > > > j...@linkedin.com> >> > > > > > > > wrote: >> > > > > > > > >> >> > > > > > > > >> > Hi, Guozhang, >> > > > > > > > >> > >> > > > > > > > >> > Thanks for reading the KIP. By "old consumer", I meant >> the >> > > > > > > > >> > ZookeeperConsumerConnector in trunk now, i.e. without >> this >> > > bug >> > > > > > > fixed. >> > > > > > > > >> If we >> > > > > > > > >> > fix the ZookeeperConsumerConnector then it will throw >> > > > exception >> > > > > > > > >> complaining >> > > > > > > > >> > about the unsupported version when it sees message >> format >> > > V1. >> > > > > > What I >> > > > > > > > was >> > > > > > > > >> > trying to say is that if we have some >> > > > ZookeeperConsumerConnector >> > > > > > > > running >> > > > > > > > >> > without the fix, the consumer will complain about CRC >> > > mismatch >> > > > > > > instead >> > > > > > > > >> of >> > > > > > > > >> > unsupported version. >> > > > > > > > >> > >> > > > > > > > >> > Thanks, >> > > > > > > > >> > >> > > > > > > > >> > Jiangjie (Becket) Qin >> > > > > > > > >> > >> > > > > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang Wang < >> > > > > > wangg...@gmail.com> >> > > > > > > > >> wrote: >> > > > > > > > >> > >> > > > > > > > >> >> Thanks for the write-up Jiangjie. >> > > > > > > > >> >> >> > > > > > > > >> >> One comment about migration plan: "For old consumers, >> if >> > > they >> > > > > see >> > > > > > > the >> > > > > > > > >> new >> > > > > > > > >> >> protocol the CRC check will fail".. >> > > > > > > > >> >> >> > > > > > > > >> >> Do you mean this bug in the old consumer cannot be >> fixed >> > > in a >> > > > > > > > >> >> backward-compatible way? >> > > > > > > > >> >> >> > > > > > > > >> >> Guozhang >> > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie Qin >> > > > > > > > <j...@linkedin.com.invalid >> > > > > > > > >> > >> > > > > > > > >> >> wrote: >> > > > > > > > >> >> >> > > > > > > > >> >> > Hi, >> > > > > > > > >> >> > >> > > > > > > > >> >> > We just created KIP-31 to propose a message format >> > change >> > > > in >> > > > > > > Kafka. >> > > > > > > > >> >> > >> > > > > > > > >> >> > >> > > > > > > > >> >> > >> > > > > > > > >> >> >> > > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal >> > > > > > > > >> >> > >> > > > > > > > >> >> > As a summary, the motivations are: >> > > > > > > > >> >> > 1. Avoid server side message re-compression >> > > > > > > > >> >> > 2. Honor time-based log roll and retention >> > > > > > > > >> >> > 3. Enable offset search by timestamp at a finer >> > > > granularity. >> > > > > > > > >> >> > >> > > > > > > > >> >> > Feedback and comments are welcome! >> > > > > > > > >> >> > >> > > > > > > > >> >> > Thanks, >> > > > > > > > >> >> > >> > > > > > > > >> >> > Jiangjie (Becket) Qin >> > > > > > > > >> >> > >> > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> >> > > > > > > > >> >> -- >> > > > > > > > >> >> -- Guozhang >> > > > > > > > >> >> >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > Thanks, >> > > > > > > Ewen >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > Thanks, >> > > Neha >> > > >> > >>