Hi Joel, Good point about rebuilding index. I agree that having a per message LogAppendTime might be necessary. About time adjustment, the solution sounds promising, but it might be better to make it as a follow up of the KIP because it seems a really rare use case.
I have another thought on how to manage the out of order timestamps. Maybe we can do the following: Create a special log compacted topic __timestamp_index similar to topic, the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the value is offset. In memory, we keep a map for each TopicPartition, the value is (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This way we can search out of order message and make sure no message is missing. Thoughts? Thanks, Jiangjie (Becket) Qin On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Jay had mentioned the scenario of mirror-maker bootstrap which would > effectively reset the logAppendTimestamps for the bootstrapped data. > If we don't include logAppendTimestamps in each message there is a > similar scenario when rebuilding indexes during recovery. So it seems > it may be worth adding that timestamp to messages. The drawback to > that is exposing a server-side concept in the protocol (although we > already do that with offsets). logAppendTimestamp really should be > decided by the broker so I think the first scenario may have to be > written off as a gotcha, but the second may be worth addressing (by > adding it to the message format). > > The other point that Jay raised which needs to be addressed (since we > require monotically increasing timestamps in the index) in the > proposal is changing time on the server (I'm a little less concerned > about NTP clock skews than a user explicitly changing the server's > time - i.e., big clock skews). We would at least want to "set back" > all the existing timestamps to guarantee non-decreasing timestamps > with future messages. I'm not sure at this point how best to handle > that, but we could perhaps have a epoch/base-time (or time-correction) > stored in the log directories and base all log index timestamps off > that base-time (or corrected). So if at any time you determine that > time has changed backwards you can adjust that base-time without > having to fix up all the entries. Without knowing the exact diff > between the previous clock and new clock we cannot adjust the times > exactly, but we can at least ensure increasing timestamps. > > On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin > <j...@linkedin.com.invalid> wrote: > > Ewen and Jay, > > > > They way I see the LogAppendTime is another format of "offset". It serves > > the following purpose: > > 1. Locate messages not only by position, but also by time. The difference > > from offset is timestamp is not unique for all messags. > > 2. Allow broker to manage messages based on time, e.g. retention, rolling > > 3. Provide convenience for user to search message not only by offset, but > > also by timestamp. > > > > For purpose (2) we don't need per message server timestamp. We only need > > per log segment server timestamp and propagate it among brokers. > > > > For (1) and (3), we need per message timestamp. Then the question is > > whether we should use CreateTime or LogAppendTime? > > > > I completely agree that an application timestamp is very useful for many > > use cases. But it seems to me that having Kafka to understand and > maintain > > application timestamp is a bit over demanding. So I think there is value > to > > pass on CreateTime for application convenience, but I am not sure it can > > replace LogAppendTime. Managing out-of-order CreateTime is equivalent to > > allowing producer to send their own offset and ask broker to manage the > > offset for them, It is going to be very hard to maintain and could create > > huge performance/functional issue because of complicated logic. > > > > About whether we should expose LogAppendTime to broker, I agree that > server > > timestamp is internal to broker, but isn't offset also an internal > concept? > > Arguably it's not provided by producer so consumer application logic does > > not have to know offset. But user needs to know offset because they need > to > > know "where is the message" in the log. LogAppendTime provides the answer > > of "When was the message appended" to the log. So personally I think it > is > > reasonable to expose the LogAppendTime to consumers. > > > > I can see some use cases of exposing the LogAppendTime, to name some: > > 1. Let's say broker has 7 days of log retention, some application wants > to > > reprocess the data in past 3 days. User can simply provide the timestamp > > and start consume. > > 2. User can easily know lag by time. > > 3. Cross cluster fail over. This is a more complicated use case, there > are > > two goals: 1) Not lose message; and 2) do not reconsume tons of messages. > > Only knowing offset of cluster A won't help with finding fail over point > in > > cluster B because an offset of a cluster means nothing to another > cluster. > > Timestamp however is a good cross cluster reference in this case. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava < > e...@confluent.io> > > wrote: > > > >> Re: MM preserving timestamps: Yes, this was how I interpreted the point > in > >> the KIP and I only raised the issue because it restricts the usefulness > of > >> timestamps anytime MM is involved. I agree it's not a deal breaker, but > I > >> wanted to understand exact impact of the change. Some users seem to > want to > >> be able to seek by application-defined timestamps (despite the many > obvious > >> issues involved), and the proposal clearly would not support that unless > >> the timestamps submitted with the produce requests were respected. If we > >> ignore client submitted timestamps, then we probably want to try to hide > >> the timestamps as much as possible in any public interface (e.g. never > >> shows up in any public consumer APIs), but expose it just enough to be > >> useful for operational purposes. > >> > >> Sorry if my devil's advocate position / attempt to map the design space > led > >> to some confusion! > >> > >> -Ewen > >> > >> > >> On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps <j...@confluent.io> wrote: > >> > >> > Ah, I see, I think I misunderstood about MM, it was called out in the > >> > proposal and I thought you were saying you'd retain the timestamp but > I > >> > think you're calling out that you're not. In that case you do have the > >> > opposite problem, right? When you add mirroring for a topic all that > data > >> > will have a timestamp of now and retention won't be right. Not a > blocker > >> > but a bit of a gotcha. > >> > > >> > -Jay > >> > > >> > > >> > > >> > On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy <jjkosh...@gmail.com> > wrote: > >> > > >> > > > Don't you see all the same issues you see with client-defined > >> > timestamp's > >> > > > if you let mm control the timestamp as you were proposing? That > means > >> > > time > >> > > > >> > > Actually I don't think that was in the proposal (or was it?). i.e., > I > >> > > think it was always supposed to be controlled by the broker (and not > >> > > MM). > >> > > > >> > > > Also, Joel, can you just confirm that you guys have talked through > >> the > >> > > > whole timestamp thing with the Samza folks at LI? The reason I ask > >> > about > >> > > > this is that Samza and Kafka Streams (KIP-28) are both trying to > rely > >> > on > >> > > > >> > > We have not. This is a good point - we will follow-up. > >> > > > >> > > > WRT your idea of a FollowerFetchRequestI had thought of a similar > >> idea > >> > > > where we use the leader's timestamps to approximately set the > >> > follower's > >> > > > timestamps. I had thought of just adding a partition metadata > request > >> > > that > >> > > > would subsume the current offset/time lookup and could be used by > the > >> > > > follower to try to approximately keep their timestamps kosher. > It's a > >> > > > little hacky and doesn't help with MM but it is also maybe less > >> > invasive > >> > > so > >> > > > that approach could be viable. > >> > > > >> > > That would also work, but perhaps responding with the actual leader > >> > > offset-timestamp entries (corresponding to the fetched portion) > would > >> > > be exact and it should be small as well. Anyway, the main motivation > >> > > in this was to avoid leaking server-side timestamps to the > >> > > message-format if people think it is worth it so the alternatives > are > >> > > implementation details. My original instinct was that it also > avoids a > >> > > backwards incompatible change (but it does not because we also have > >> > > the relative offset change). > >> > > > >> > > Thanks, > >> > > > >> > > Joel > >> > > > >> > > > > >> > > > > >> > > > > >> > > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy <jjkosh...@gmail.com> > >> > wrote: > >> > > > > >> > > >> I just wanted to comment on a few points made earlier in this > >> thread: > >> > > >> > >> > > >> Concerns on clock skew: at least for the original proposal's > scope > >> > > >> (which was more for honoring retention broker-side) this would > only > >> be > >> > > >> an issue when spanning leader movements right? i.e., leader > >> migration > >> > > >> latency has to be much less than clock skew for this to be a real > >> > > >> issue wouldn’t it? > >> > > >> > >> > > >> Client timestamp vs broker timestamp: I’m not sure Kafka > (brokers) > >> are > >> > > >> the right place to reason about client-side timestamps precisely > due > >> > > >> to the nuances that have been discussed at length in this > thread. My > >> > > >> preference would have been to the timestamp (now called > >> > > >> LogAppendTimestamp) have nothing to do with the applications. > Ewen > >> > > >> raised a valid concern about leaking such “private/server-side” > >> > > >> timestamps into the protocol spec. i.e., it is fine to have the > >> > > >> CreateTime which is expressly client-provided and immutable > >> > > >> thereafter, but the LogAppendTime is also going part of the > protocol > >> > > >> and it would be good to avoid exposure (to client developers) if > >> > > >> possible. Ok, so here is a slightly different approach that I was > >> just > >> > > >> thinking about (and did not think too far so it may not work): do > >> not > >> > > >> add the LogAppendTime to messages. Instead, build the time-based > >> index > >> > > >> on the server side on message arrival time alone. Introduce a new > >> > > >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses will > also > >> > > >> include the slice of the time-based index for the follower > broker. > >> > > >> This way we can at least keep timestamps aligned across brokers > for > >> > > >> retention purposes. We do lose the append timestamp for mirroring > >> > > >> pipelines (which appears to be the case in KIP-32 as well). > >> > > >> > >> > > >> Configurable index granularity: We can do this but I’m not sure > it > >> is > >> > > >> very useful and as Jay noted, a major change from the old > proposal > >> > > >> linked from the KIP is the sparse time-based index which we felt > was > >> > > >> essential to bound memory usage (and having timestamps on each > log > >> > > >> index entry was probably a big waste since in the common case > >> several > >> > > >> messages span the same timestamp). BTW another benefit of the > second > >> > > >> index is that it makes it easier to roll-back or throw away if > >> > > >> necessary (vs. modifying the existing index format) - although > that > >> > > >> obviously does not help with rolling back the timestamp change in > >> the > >> > > >> message format, but it is one less thing to worry about. > >> > > >> > >> > > >> Versioning: I’m not sure everyone is saying the same thing wrt > the > >> > > >> scope of this. There is the record format change, but I also > think > >> > > >> this ties into all of the API versioning that we already have in > >> > > >> Kafka. The current API versioning approach works fine for > >> > > >> upgrades/downgrades across official Kafka releases, but not so > well > >> > > >> between releases. (We almost got bitten by this at LinkedIn with > the > >> > > >> recent changes to various requests but were able to work around > >> > > >> these.) We can clarify this in the follow-up KIP. > >> > > >> > >> > > >> Thanks, > >> > > >> > >> > > >> Joel > >> > > >> > >> > > >> > >> > > >> On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin > >> > <j...@linkedin.com.invalid > >> > > > > >> > > >> wrote: > >> > > >> > Hi Jay, > >> > > >> > > >> > > >> > I just changed the KIP title and updated the KIP page. > >> > > >> > > >> > > >> > And yes, we are working on a general version control proposal > to > >> > make > >> > > the > >> > > >> > protocol migration like this more smooth. I will also create a > KIP > >> > for > >> > > >> that > >> > > >> > soon. > >> > > >> > > >> > > >> > Thanks, > >> > > >> > > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > > >> > > >> > > >> > > >> > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps <j...@confluent.io> > >> > wrote: > >> > > >> > > >> > > >> >> Great, can we change the name to something related to the > >> > > >> change--"KIP-31: > >> > > >> >> Move to relative offsets in compressed message sets". > >> > > >> >> > >> > > >> >> Also you had mentioned before you were going to expand on the > >> > > mechanics > >> > > >> of > >> > > >> >> handling these log format changes, right? > >> > > >> >> > >> > > >> >> -Jay > >> > > >> >> > >> > > >> >> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin > >> > > >> <j...@linkedin.com.invalid> > >> > > >> >> wrote: > >> > > >> >> > >> > > >> >> > Neha and Jay, > >> > > >> >> > > >> > > >> >> > Thanks a lot for the feedback. Good point about splitting > the > >> > > >> >> discussion. I > >> > > >> >> > have split the proposal to three KIPs and it does make each > >> > > discussion > >> > > >> >> more > >> > > >> >> > clear: > >> > > >> >> > KIP-31 - Message format change (Use relative offset) > >> > > >> >> > KIP-32 - Add CreateTime and LogAppendTime to Kafka message > >> > > >> >> > KIP-33 - Build a time-based log index > >> > > >> >> > > >> > > >> >> > KIP-33 can be a follow up KIP for KIP-32, so we can discuss > >> about > >> > > >> KIP-31 > >> > > >> >> > and KIP-32 first for now. I will create a separate > discussion > >> > > thread > >> > > >> for > >> > > >> >> > KIP-32 and reply the concerns you raised regarding the > >> timestamp. > >> > > >> >> > > >> > > >> >> > So far it looks there is no objection to KIP-31. Since I > >> removed > >> > a > >> > > few > >> > > >> >> part > >> > > >> >> > from previous KIP and only left the relative offset > proposal, > >> it > >> > > >> would be > >> > > >> >> > great if people can take another look to see if there is any > >> > > concerns. > >> > > >> >> > > >> > > >> >> > Thanks, > >> > > >> >> > > >> > > >> >> > Jiangjie (Becket) Qin > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede < > >> n...@confluent.io > >> > > > >> > > >> wrote: > >> > > >> >> > > >> > > >> >> > > Becket, > >> > > >> >> > > > >> > > >> >> > > Nice write-up. Few thoughts - > >> > > >> >> > > > >> > > >> >> > > I'd split up the discussion for simplicity. Note that you > can > >> > > always > >> > > >> >> > group > >> > > >> >> > > several of these in one patch to reduce the protocol > changes > >> > > people > >> > > >> >> have > >> > > >> >> > to > >> > > >> >> > > deal with.This is just a suggestion, but I think the > >> following > >> > > split > >> > > >> >> > might > >> > > >> >> > > make it easier to tackle the changes being proposed - > >> > > >> >> > > > >> > > >> >> > > - Relative offsets > >> > > >> >> > > - Introducing the concept of time > >> > > >> >> > > - Time-based indexing (separate the usage of the > timestamp > >> > > field > >> > > >> >> from > >> > > >> >> > > how/whether we want to include a timestamp in the > message) > >> > > >> >> > > > >> > > >> >> > > I'm a +1 on relative offsets, we should've done it back > when > >> we > >> > > >> >> > introduced > >> > > >> >> > > it. Other than reducing the CPU overhead, this will also > >> reduce > >> > > the > >> > > >> >> > garbage > >> > > >> >> > > collection overhead on the brokers. > >> > > >> >> > > > >> > > >> >> > > On the timestamp field, I generally agree that we should > add > >> a > >> > > >> >> timestamp > >> > > >> >> > to > >> > > >> >> > > a Kafka message but I'm not quite sold on how this KIP > >> suggests > >> > > the > >> > > >> >> > > timestamp be set. Will avoid repeating the downsides of a > >> > broker > >> > > >> side > >> > > >> >> > > timestamp mentioned previously in this thread. I think the > >> > topic > >> > > of > >> > > >> >> > > including a timestamp in a Kafka message requires a lot > more > >> > > thought > >> > > >> >> and > >> > > >> >> > > details than what's in this KIP. I'd suggest we make it a > >> > > separate > >> > > >> KIP > >> > > >> >> > that > >> > > >> >> > > includes a list of all the different use cases for the > >> > timestamp > >> > > >> >> (beyond > >> > > >> >> > > log retention) including stream processing and discuss > >> > tradeoffs > >> > > of > >> > > >> >> > > including client and broker side timestamps. > >> > > >> >> > > > >> > > >> >> > > Agree with the benefit of time-based indexing, but haven't > >> had > >> > a > >> > > >> chance > >> > > >> >> > to > >> > > >> >> > > dive into the design details yet. > >> > > >> >> > > > >> > > >> >> > > Thanks, > >> > > >> >> > > Neha > >> > > >> >> > > > >> > > >> >> > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps < > j...@confluent.io > >> > > >> > > >> wrote: > >> > > >> >> > > > >> > > >> >> > > > Hey Beckett, > >> > > >> >> > > > > >> > > >> >> > > > I was proposing splitting up the KIP just for > simplicity of > >> > > >> >> discussion. > >> > > >> >> > > You > >> > > >> >> > > > can still implement them in one patch. I think > otherwise it > >> > > will > >> > > >> be > >> > > >> >> > hard > >> > > >> >> > > to > >> > > >> >> > > > discuss/vote on them since if you like the offset > proposal > >> > but > >> > > not > >> > > >> >> the > >> > > >> >> > > time > >> > > >> >> > > > proposal what do you do? > >> > > >> >> > > > > >> > > >> >> > > > Introducing a second notion of time into Kafka is a > pretty > >> > > massive > >> > > >> >> > > > philosophical change so it kind of warrants it's own > KIP I > >> > > think > >> > > >> it > >> > > >> >> > isn't > >> > > >> >> > > > just "Change message format". > >> > > >> >> > > > > >> > > >> >> > > > WRT time I think one thing to clarify in the proposal is > >> how > >> > MM > >> > > >> will > >> > > >> >> > have > >> > > >> >> > > > access to set the timestamp? Presumably this will be a > new > >> > > field > >> > > >> in > >> > > >> >> > > > ProducerRecord, right? If so then any user can set the > >> > > timestamp, > >> > > >> >> > right? > >> > > >> >> > > > I'm not sure you answered the questions around how this > >> will > >> > > work > >> > > >> for > >> > > >> >> > MM > >> > > >> >> > > > since when MM retains timestamps from multiple > partitions > >> > they > >> > > >> will > >> > > >> >> > then > >> > > >> >> > > be > >> > > >> >> > > > out of order and in the past (so the > >> > max(lastAppendedTimestamp, > >> > > >> >> > > > currentTimeMillis) override you proposed will not work, > >> > > right?). > >> > > >> If > >> > > >> >> we > >> > > >> >> > > > don't do this then when you set up mirroring the data > will > >> > all > >> > > be > >> > > >> new > >> > > >> >> > and > >> > > >> >> > > > you have the same retention problem you described. > Maybe I > >> > > missed > >> > > >> >> > > > something...? > >> > > >> >> > > > > >> > > >> >> > > > My main motivation is that given that both Samza and > Kafka > >> > > streams > >> > > >> >> are > >> > > >> >> > > > doing work that implies a mandatory client-defined > notion > >> of > >> > > >> time, I > >> > > >> >> > > really > >> > > >> >> > > > think introducing a different mandatory notion of time > in > >> > > Kafka is > >> > > >> >> > going > >> > > >> >> > > to > >> > > >> >> > > > be quite odd. We should think hard about how > client-defined > >> > > time > >> > > >> >> could > >> > > >> >> > > > work. I'm not sure if it can, but I'm also not sure > that it > >> > > can't. > >> > > >> >> > Having > >> > > >> >> > > > both will be odd. Did you chat about this with > Yi/Kartik on > >> > the > >> > > >> Samza > >> > > >> >> > > side? > >> > > >> >> > > > > >> > > >> >> > > > When you are saying it won't work you are assuming some > >> > > particular > >> > > >> >> > > > implementation? Maybe that the index is a monotonically > >> > > increasing > >> > > >> >> set > >> > > >> >> > of > >> > > >> >> > > > pointers to the least record with a timestamp larger > than > >> the > >> > > >> index > >> > > >> >> > time? > >> > > >> >> > > > In other words a search for time X gives the largest > offset > >> > at > >> > > >> which > >> > > >> >> > all > >> > > >> >> > > > records are <= X? > >> > > >> >> > > > > >> > > >> >> > > > For retention, I agree with the problem you point out, > but > >> I > >> > > think > >> > > >> >> what > >> > > >> >> > > you > >> > > >> >> > > > are saying in that case is that you want a size limit > too. > >> If > >> > > you > >> > > >> use > >> > > >> >> > > > system time you actually hit the same problem: say you > do a > >> > > full > >> > > >> dump > >> > > >> >> > of > >> > > >> >> > > a > >> > > >> >> > > > DB table with a setting of 7 days retention, your > retention > >> > > will > >> > > >> >> > actually > >> > > >> >> > > > not get enforced for the first 7 days because the data > is > >> > "new > >> > > to > >> > > >> >> > Kafka". > >> > > >> >> > > > > >> > > >> >> > > > -Jay > >> > > >> >> > > > > >> > > >> >> > > > > >> > > >> >> > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin > >> > > >> >> > <j...@linkedin.com.invalid > >> > > >> >> > > > > >> > > >> >> > > > wrote: > >> > > >> >> > > > > >> > > >> >> > > > > Jay, > >> > > >> >> > > > > > >> > > >> >> > > > > Thanks for the comments. Yes, there are actually three > >> > > >> proposals as > >> > > >> >> > you > >> > > >> >> > > > > pointed out. > >> > > >> >> > > > > > >> > > >> >> > > > > We will have a separate proposal for (1) - version > >> control > >> > > >> >> mechanism. > >> > > >> >> > > We > >> > > >> >> > > > > actually thought about whether we want to separate 2 > and > >> 3 > >> > > >> >> internally > >> > > >> >> > > > > before creating the KIP. The reason we put 2 and 3 > >> together > >> > > is > >> > > >> it > >> > > >> >> > will > >> > > >> >> > > > > saves us another cross board wire protocol change. > Like > >> you > >> > > >> said, > >> > > >> >> we > >> > > >> >> > > have > >> > > >> >> > > > > to migrate all the clients in all languages. To some > >> > extent, > >> > > the > >> > > >> >> > effort > >> > > >> >> > > > to > >> > > >> >> > > > > spend on upgrading the clients can be even bigger than > >> > > >> implementing > >> > > >> >> > the > >> > > >> >> > > > new > >> > > >> >> > > > > feature itself. So there are some attractions if we > can > >> do > >> > 2 > >> > > >> and 3 > >> > > >> >> > > > together > >> > > >> >> > > > > instead of separately. Maybe after (1) is done it > will be > >> > > >> easier to > >> > > >> >> > do > >> > > >> >> > > > > protocol migration. But if we are able to come to an > >> > > agreement > >> > > >> on > >> > > >> >> the > >> > > >> >> > > > > timestamp solution, I would prefer to have it together > >> with > >> > > >> >> relative > >> > > >> >> > > > offset > >> > > >> >> > > > > in the interest of avoiding another wire protocol > change > >> > (the > >> > > >> >> process > >> > > >> >> > > to > >> > > >> >> > > > > migrate to relative offset is exactly the same as > migrate > >> > to > >> > > >> >> message > >> > > >> >> > > with > >> > > >> >> > > > > timestamp). > >> > > >> >> > > > > > >> > > >> >> > > > > In terms of timestamp. I completely agree that having > >> > client > >> > > >> >> > timestamp > >> > > >> >> > > is > >> > > >> >> > > > > more useful if we can make sure the timestamp is good. > >> But > >> > in > >> > > >> >> reality > >> > > >> >> > > > that > >> > > >> >> > > > > can be a really big *IF*. I think the problem is > exactly > >> as > >> > > Ewen > >> > > >> >> > > > mentioned, > >> > > >> >> > > > > if we let the client to set the timestamp, it would be > >> very > >> > > hard > >> > > >> >> for > >> > > >> >> > > the > >> > > >> >> > > > > broker to utilize it. If broker apply retention policy > >> > based > >> > > on > >> > > >> the > >> > > >> >> > > > client > >> > > >> >> > > > > timestamp. One misbehave producer can potentially > >> > completely > >> > > >> mess > >> > > >> >> up > >> > > >> >> > > the > >> > > >> >> > > > > retention policy on the broker. Although people don't > >> care > >> > > about > >> > > >> >> > server > >> > > >> >> > > > > side timestamp. People do care a lot when timestamp > >> breaks. > >> > > >> >> Searching > >> > > >> >> > > by > >> > > >> >> > > > > timestamp is a really important use case even though > it > >> is > >> > > not > >> > > >> used > >> > > >> >> > as > >> > > >> >> > > > > often as searching by offset. It has significant > direct > >> > > impact > >> > > >> on > >> > > >> >> RTO > >> > > >> >> > > > when > >> > > >> >> > > > > there is a cross cluster failover as Todd mentioned. > >> > > >> >> > > > > > >> > > >> >> > > > > The trick using max(lastAppendedTimestamp, > >> > currentTimeMillis) > >> > > >> is to > >> > > >> >> > > > > guarantee monotonic increase of the timestamp. Many > >> > > commercial > >> > > >> >> system > >> > > >> >> > > > > actually do something similar to this to solve the > time > >> > skew. > >> > > >> About > >> > > >> >> > > > > changing the time, I am not sure if people use NTP > like > >> > > using a > >> > > >> >> watch > >> > > >> >> > > to > >> > > >> >> > > > > just set it forward/backward by an hour or so. The > time > >> > > >> adjustment > >> > > >> >> I > >> > > >> >> > > used > >> > > >> >> > > > > to do is typically to adjust something like a minute > / > >> > > week. So > >> > > >> >> for > >> > > >> >> > > each > >> > > >> >> > > > > second, there might be a few mircoseconds > slower/faster > >> but > >> > > >> should > >> > > >> >> > not > >> > > >> >> > > > > break the clock completely to make sure all the > >> time-based > >> > > >> >> > transactions > >> > > >> >> > > > are > >> > > >> >> > > > > not affected. The one minute change will be done > within a > >> > > week > >> > > >> but > >> > > >> >> > not > >> > > >> >> > > > > instantly. > >> > > >> >> > > > > > >> > > >> >> > > > > Personally, I think having client side timestamp will > be > >> > > useful > >> > > >> if > >> > > >> >> we > >> > > >> >> > > > don't > >> > > >> >> > > > > need to put the broker and data integrity under risk. > If > >> we > >> > > >> have to > >> > > >> >> > > > choose > >> > > >> >> > > > > from one of them but not both. I would prefer server > side > >> > > >> timestamp > >> > > >> >> > > > because > >> > > >> >> > > > > for client side timestamp there is always a plan B > which > >> is > >> > > >> putting > >> > > >> >> > the > >> > > >> >> > > > > timestamp into payload. > >> > > >> >> > > > > > >> > > >> >> > > > > Another reason I am reluctant to use the client side > >> > > timestamp > >> > > >> is > >> > > >> >> > that > >> > > >> >> > > it > >> > > >> >> > > > > is always dangerous to mix the control plane with data > >> > > plane. IP > >> > > >> >> did > >> > > >> >> > > this > >> > > >> >> > > > > and it has caused so many different breaches so people > >> are > >> > > >> >> migrating > >> > > >> >> > to > >> > > >> >> > > > > something like MPLS. An example in Kafka is that any > >> client > >> > > can > >> > > >> >> > > > construct a > >> > > >> >> > > > > > >> > > >> > LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest > >> > > >> >> > > (you > >> > > >> >> > > > > name it) and send it to the broker to mess up the > entire > >> > > >> cluster, > >> > > >> >> > also > >> > > >> >> > > as > >> > > >> >> > > > > we already noticed a busy cluster can respond quite > slow > >> to > >> > > >> >> > controller > >> > > >> >> > > > > messages. So it would really be nice if we can avoid > >> giving > >> > > the > >> > > >> >> power > >> > > >> >> > > to > >> > > >> >> > > > > clients to control the log retention. > >> > > >> >> > > > > > >> > > >> >> > > > > Thanks, > >> > > >> >> > > > > > >> > > >> >> > > > > Jiangjie (Becket) Qin > >> > > >> >> > > > > > >> > > >> >> > > > > > >> > > >> >> > > > > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino < > >> > > tpal...@gmail.com> > >> > > >> >> > wrote: > >> > > >> >> > > > > > >> > > >> >> > > > > > So, with regards to why you want to search by > >> timestamp, > >> > > the > >> > > >> >> > biggest > >> > > >> >> > > > > > problem I've seen is with consumers who want to > reset > >> > their > >> > > >> >> > > timestamps > >> > > >> >> > > > > to a > >> > > >> >> > > > > > specific point, whether it is to replay a certain > >> amount > >> > of > >> > > >> >> > messages, > >> > > >> >> > > > or > >> > > >> >> > > > > to > >> > > >> >> > > > > > rewind to before some problem state existed. This > >> happens > >> > > more > >> > > >> >> > often > >> > > >> >> > > > than > >> > > >> >> > > > > > anyone would like. > >> > > >> >> > > > > > > >> > > >> >> > > > > > To handle this now we need to constantly export the > >> > > broker's > >> > > >> >> offset > >> > > >> >> > > for > >> > > >> >> > > > > > every partition to a time-series database and then > use > >> > > >> external > >> > > >> >> > > > processes > >> > > >> >> > > > > > to query this. I know we're not the only ones doing > >> this. > >> > > The > >> > > >> way > >> > > >> >> > the > >> > > >> >> > > > > > broker handles requests for offsets by timestamp is > a > >> > > little > >> > > >> >> obtuse > >> > > >> >> > > > > > (explain it to anyone without intimate knowledge of > the > >> > > >> internal > >> > > >> >> > > > workings > >> > > >> >> > > > > > of the broker - every time I do I see this). In > >> addition, > >> > > as > >> > > >> >> Becket > >> > > >> >> > > > > pointed > >> > > >> >> > > > > > out, it causes problems specifically with retention > of > >> > > >> messages > >> > > >> >> by > >> > > >> >> > > time > >> > > >> >> > > > > > when you move partitions around. > >> > > >> >> > > > > > > >> > > >> >> > > > > > I'm deliberately avoiding the discussion of what > >> > timestamp > >> > > to > >> > > >> >> use. > >> > > >> >> > I > >> > > >> >> > > > can > >> > > >> >> > > > > > see the argument either way, though I tend to lean > >> > towards > >> > > the > >> > > >> >> idea > >> > > >> >> > > > that > >> > > >> >> > > > > > the broker timestamp is the only viable source of > truth > >> > in > >> > > >> this > >> > > >> >> > > > > situation. > >> > > >> >> > > > > > > >> > > >> >> > > > > > -Todd > >> > > >> >> > > > > > > >> > > >> >> > > > > > > >> > > >> >> > > > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen > Cheslack-Postava < > >> > > >> >> > > > e...@confluent.io > >> > > >> >> > > > > > > >> > > >> >> > > > > > wrote: > >> > > >> >> > > > > > > >> > > >> >> > > > > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps < > >> > > j...@confluent.io > >> > > >> > > >> > > >> >> > > wrote: > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > 2. Nobody cares what time it is on the server. > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > This is a good way of summarizing the issue I was > >> > trying > >> > > to > >> > > >> get > >> > > >> >> > at, > >> > > >> >> > > > > from > >> > > >> >> > > > > > an > >> > > >> >> > > > > > > app's perspective. Of the 3 stated goals of the > KIP, > >> #2 > >> > > (lot > >> > > >> >> > > > retention) > >> > > >> >> > > > > > is > >> > > >> >> > > > > > > reasonably handled by a server-side timestamp. I > >> really > >> > > just > >> > > >> >> care > >> > > >> >> > > > that > >> > > >> >> > > > > a > >> > > >> >> > > > > > > message is there long enough that I have a chance > to > >> > > process > >> > > >> >> it. > >> > > >> >> > #3 > >> > > >> >> > > > > > > (searching by timestamp) only seems useful if we > can > >> > > >> guarantee > >> > > >> >> > the > >> > > >> >> > > > > > > server-side timestamp is close enough to the > original > >> > > >> >> client-side > >> > > >> >> > > > > > > timestamp, and any mirror maker step seems to > break > >> > that > >> > > >> (even > >> > > >> >> > > > ignoring > >> > > >> >> > > > > > any > >> > > >> >> > > > > > > issues with broker availability). > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > I'm also wondering whether optimizing for > >> > > >> search-by-timestamp > >> > > >> >> on > >> > > >> >> > > the > >> > > >> >> > > > > > broker > >> > > >> >> > > > > > > is really something we want to do given that > messages > >> > > aren't > >> > > >> >> > really > >> > > >> >> > > > > > > guaranteed to be ordered by application-level > >> > timestamps > >> > > on > >> > > >> the > >> > > >> >> > > > broker. > >> > > >> >> > > > > > Is > >> > > >> >> > > > > > > part of the need for this just due to the current > >> > > consumer > >> > > >> APIs > >> > > >> >> > > being > >> > > >> >> > > > > > > difficult to work with? For example, could you > >> > implement > >> > > >> this > >> > > >> >> > > pretty > >> > > >> >> > > > > > easily > >> > > >> >> > > > > > > client side just the way you would broker-side? > I'd > >> > > imagine > >> > > >> a > >> > > >> >> > > couple > >> > > >> >> > > > of > >> > > >> >> > > > > > > random seeks + reads during very rare occasions > (i.e. > >> > > when > >> > > >> the > >> > > >> >> > app > >> > > >> >> > > > > starts > >> > > >> >> > > > > > > up) wouldn't be a problem performance-wise. Or is > it > >> > also > >> > > >> that > >> > > >> >> > you > >> > > >> >> > > > need > >> > > >> >> > > > > > the > >> > > >> >> > > > > > > broker to enforce things like monotonically > >> increasing > >> > > >> >> timestamps > >> > > >> >> > > > since > >> > > >> >> > > > > > you > >> > > >> >> > > > > > > can't do the query properly and efficiently > without > >> > that > >> > > >> >> > guarantee, > >> > > >> >> > > > and > >> > > >> >> > > > > > > therefore what applications are actually looking > for > >> > *is* > >> > > >> >> > > broker-side > >> > > >> >> > > > > > > timestamps? > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > -Ewen > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > > Consider cases where data is being copied from a > >> > > database > >> > > >> or > >> > > >> >> > from > >> > > >> >> > > > log > >> > > >> >> > > > > > > > files. In steady-state the server time is very > >> close > >> > to > >> > > >> the > >> > > >> >> > > client > >> > > >> >> > > > > time > >> > > >> >> > > > > > > if > >> > > >> >> > > > > > > > their clocks are sync'd (see 1) but there will > be > >> > > times of > >> > > >> >> > large > >> > > >> >> > > > > > > divergence > >> > > >> >> > > > > > > > when the copying process is stopped or falls > >> behind. > >> > > When > >> > > >> >> this > >> > > >> >> > > > occurs > >> > > >> >> > > > > > it > >> > > >> >> > > > > > > is > >> > > >> >> > > > > > > > clear that the time the data arrived on the > server > >> is > >> > > >> >> > irrelevant, > >> > > >> >> > > > it > >> > > >> >> > > > > is > >> > > >> >> > > > > > > the > >> > > >> >> > > > > > > > source timestamp that matters. This is the > problem > >> > you > >> > > are > >> > > >> >> > trying > >> > > >> >> > > > to > >> > > >> >> > > > > > fix > >> > > >> >> > > > > > > by > >> > > >> >> > > > > > > > retaining the mm timestamp but really the client > >> > should > >> > > >> >> always > >> > > >> >> > > set > >> > > >> >> > > > > the > >> > > >> >> > > > > > > time > >> > > >> >> > > > > > > > with the use of server-side time as a fallback. > It > >> > > would > >> > > >> be > >> > > >> >> > worth > >> > > >> >> > > > > > talking > >> > > >> >> > > > > > > > to the Samza folks and reading through this blog > >> > post ( > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > >> > > >> >> > > > > > >> > > >> >> > > > > >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> > >> > > > >> > > >> > http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html > >> > > >> >> > > > > > > > ) > >> > > >> >> > > > > > > > on this subject since we went through similar > >> > > learnings on > >> > > >> >> the > >> > > >> >> > > > stream > >> > > >> >> > > > > > > > processing side. > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > I think the implication of these two is that we > >> need > >> > a > >> > > >> >> proposal > >> > > >> >> > > > that > >> > > >> >> > > > > > > > handles potentially very out-of-order > timestamps in > >> > > some > >> > > >> kind > >> > > >> >> > of > >> > > >> >> > > > > sanish > >> > > >> >> > > > > > > way > >> > > >> >> > > > > > > > (buggy clients will set something totally wrong > as > >> > the > >> > > >> time). > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > -Jay > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps < > >> > > >> j...@confluent.io> > >> > > >> >> > > > wrote: > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > > The magic byte is used to version message > format > >> so > >> > > >> we'll > >> > > >> >> > need > >> > > >> >> > > to > >> > > >> >> > > > > > make > >> > > >> >> > > > > > > > > sure that check is in place--I actually don't > see > >> > it > >> > > in > >> > > >> the > >> > > >> >> > > > current > >> > > >> >> > > > > > > > > consumer code which I think is a bug we should > >> fix > >> > > for > >> > > >> the > >> > > >> >> > next > >> > > >> >> > > > > > release > >> > > >> >> > > > > > > > > (filed KAFKA-2523). The purpose of that field > is > >> so > >> > > >> there > >> > > >> >> is > >> > > >> >> > a > >> > > >> >> > > > > clear > >> > > >> >> > > > > > > > check > >> > > >> >> > > > > > > > > on the format rather than the scrambled > scenarios > >> > > Becket > >> > > >> >> > > > describes. > >> > > >> >> > > > > > > > > > >> > > >> >> > > > > > > > > Also, Becket, I don't think just fixing the > java > >> > > client > >> > > >> is > >> > > >> >> > > > > sufficient > >> > > >> >> > > > > > > as > >> > > >> >> > > > > > > > > that would break other clients--i.e. if anyone > >> > > writes a > >> > > >> v1 > >> > > >> >> > > > > messages, > >> > > >> >> > > > > > > even > >> > > >> >> > > > > > > > > by accident, any non-v1-capable consumer will > >> > break. > >> > > I > >> > > >> >> think > >> > > >> >> > we > >> > > >> >> > > > > > > probably > >> > > >> >> > > > > > > > > need a way to have the server ensure a > particular > >> > > >> message > >> > > >> >> > > format > >> > > >> >> > > > > > either > >> > > >> >> > > > > > > > at > >> > > >> >> > > > > > > > > read or write time. > >> > > >> >> > > > > > > > > > >> > > >> >> > > > > > > > > -Jay > >> > > >> >> > > > > > > > > > >> > > >> >> > > > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie Qin > >> > > >> >> > > > > > <j...@linkedin.com.invalid > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > > wrote: > >> > > >> >> > > > > > > > > > >> > > >> >> > > > > > > > >> Hi Guozhang, > >> > > >> >> > > > > > > > >> > >> > > >> >> > > > > > > > >> I checked the code again. Actually CRC check > >> > > probably > >> > > >> >> won't > >> > > >> >> > > > fail. > >> > > >> >> > > > > > The > >> > > >> >> > > > > > > > >> newly > >> > > >> >> > > > > > > > >> added timestamp field might be treated as > >> > keyLength > >> > > >> >> instead, > >> > > >> >> > > so > >> > > >> >> > > > we > >> > > >> >> > > > > > are > >> > > >> >> > > > > > > > >> likely to receive an IllegalArgumentException > >> when > >> > > try > >> > > >> to > >> > > >> >> > read > >> > > >> >> > > > the > >> > > >> >> > > > > > > key. > >> > > >> >> > > > > > > > >> I'll update the KIP. > >> > > >> >> > > > > > > > >> > >> > > >> >> > > > > > > > >> Thanks, > >> > > >> >> > > > > > > > >> > >> > > >> >> > > > > > > > >> Jiangjie (Becket) Qin > >> > > >> >> > > > > > > > >> > >> > > >> >> > > > > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, Jiangjie > Qin < > >> > > >> >> > > > j...@linkedin.com> > >> > > >> >> > > > > > > > wrote: > >> > > >> >> > > > > > > > >> > >> > > >> >> > > > > > > > >> > Hi, Guozhang, > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> > Thanks for reading the KIP. By "old > >> consumer", I > >> > > >> meant > >> > > >> >> the > >> > > >> >> > > > > > > > >> > ZookeeperConsumerConnector in trunk now, > i.e. > >> > > without > >> > > >> >> this > >> > > >> >> > > bug > >> > > >> >> > > > > > > fixed. > >> > > >> >> > > > > > > > >> If we > >> > > >> >> > > > > > > > >> > fix the ZookeeperConsumerConnector then it > >> will > >> > > throw > >> > > >> >> > > > exception > >> > > >> >> > > > > > > > >> complaining > >> > > >> >> > > > > > > > >> > about the unsupported version when it sees > >> > message > >> > > >> >> format > >> > > >> >> > > V1. > >> > > >> >> > > > > > What I > >> > > >> >> > > > > > > > was > >> > > >> >> > > > > > > > >> > trying to say is that if we have some > >> > > >> >> > > > ZookeeperConsumerConnector > >> > > >> >> > > > > > > > running > >> > > >> >> > > > > > > > >> > without the fix, the consumer will complain > >> > about > >> > > CRC > >> > > >> >> > > mismatch > >> > > >> >> > > > > > > instead > >> > > >> >> > > > > > > > >> of > >> > > >> >> > > > > > > > >> > unsupported version. > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> > Thanks, > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> > Jiangjie (Becket) Qin > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, Guozhang > >> Wang < > >> > > >> >> > > > > > wangg...@gmail.com> > >> > > >> >> > > > > > > > >> wrote: > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> >> Thanks for the write-up Jiangjie. > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> One comment about migration plan: "For old > >> > > >> consumers, > >> > > >> >> if > >> > > >> >> > > they > >> > > >> >> > > > > see > >> > > >> >> > > > > > > the > >> > > >> >> > > > > > > > >> new > >> > > >> >> > > > > > > > >> >> protocol the CRC check will fail".. > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> Do you mean this bug in the old consumer > >> cannot > >> > > be > >> > > >> >> fixed > >> > > >> >> > > in a > >> > > >> >> > > > > > > > >> >> backward-compatible way? > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> Guozhang > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, Jiangjie > Qin > >> > > >> >> > > > > > > > <j...@linkedin.com.invalid > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> >> wrote: > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> > Hi, > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > We just created KIP-31 to propose a > message > >> > > format > >> > > >> >> > change > >> > > >> >> > > > in > >> > > >> >> > > > > > > Kafka. > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > >> > > >> >> > > > > > >> > > >> >> > > > > >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > As a summary, the motivations are: > >> > > >> >> > > > > > > > >> >> > 1. Avoid server side message > re-compression > >> > > >> >> > > > > > > > >> >> > 2. Honor time-based log roll and > retention > >> > > >> >> > > > > > > > >> >> > 3. Enable offset search by timestamp at > a > >> > finer > >> > > >> >> > > > granularity. > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > Feedback and comments are welcome! > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > Thanks, > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > Jiangjie (Becket) Qin > >> > > >> >> > > > > > > > >> >> > > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> >> -- > >> > > >> >> > > > > > > > >> >> -- Guozhang > >> > > >> >> > > > > > > > >> >> > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> > > >> > > >> >> > > > > > > > >> > >> > > >> >> > > > > > > > > > >> > > >> >> > > > > > > > > > >> > > >> >> > > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > -- > >> > > >> >> > > > > > > Thanks, > >> > > >> >> > > > > > > Ewen > >> > > >> >> > > > > > > > >> > > >> >> > > > > > > >> > > >> >> > > > > > >> > > >> >> > > > > >> > > >> >> > > > >> > > >> >> > > > >> > > >> >> > > > >> > > >> >> > > -- > >> > > >> >> > > Thanks, > >> > > >> >> > > Neha > >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> > >> > > > >> > > >> > >> > >> > >> -- > >> Thanks, > >> Ewen > >> > >