Mayuresh, I haven't created Jira for KIP-33 yet because it is still under discussion. I will remove the Jira link.
Thanks, Jiangjie (Becket) Qin On Mon, Sep 14, 2015 at 8:15 PM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > I suppose the jira link is different. It points to this jira : > https://issues.apache.org/jira/browse/KAFKA-1 > > > Thanks, > > Mayuresh > > On Mon, Sep 14, 2015 at 5:13 PM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > > > I just updated the KIP-33 to explain the indexing on CreateTime and > > LogAppendTime respectively. I also used some use case to compare the two > > solutions. > > Although this is for KIP-33, but it does give a some insights on whether > it > > makes sense to have a per message LogAppendTime. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index > > > > As a short summary of the conclusions we have already reached on > timestamp: > > 1. It is good to add a timestamp to the message. > > 2. LogAppendTime should be used for broker policy enforcement (Log > > retention / rolling) > > 3. It is useful to have a CreateTime in message format, which is > immutable > > after producer sends the message. > > > > There are following questions still in discussion: > > 1. Should we also add LogAppendTime to message format? > > 2. which timestamp should we use to build the index. > > > > Let's talk about question 1 first because question 2 is actually a follow > > up question for question 1. > > Here are what I think: > > 1a. To enforce broker log policy, theoretically we don't need per-message > > LogAppendTime. If we don't include LogAppendTime in message, we still > need > > to implement a separate solution to pass log segment timestamps among > > brokers. That means if we don't include the LogAppendTime in message, > there > > will be further complication in replication. > > 1b. LogAppendTime has some advantage over CreateTime (KIP-33 has detail > > comparison) > > 1c. We have already exposed offset, which is essentially an internal > > concept of message in terms of position. Exposing LogAppendTime means we > > expose another internal concept of message in terms of time. > > > > Considering the above reasons, personally I think it worth adding the > > LogAppendTime to each message. > > > > Any thoughts? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Mon, Sep 14, 2015 at 11:44 AM, Jiangjie Qin <j...@linkedin.com> > wrote: > > > > > I was trying to send last email before KIP hangout so maybe did not > think > > > it through completely. By the way, the discussion is actually more > > related > > > to KIP-33, i.e. whether we should index on CreateTime or LogAppendTime. > > > (Although it seems all the discussion are still in this mailing > > thread...) > > > This solution in last email is for indexing on CreateTime. It is > > > essentially what Jay suggested except we use a timestamp map instead > of a > > > memory mapped index file. Please ignore the proposal of using a log > > > compacted topic. The solution can be simplified to: > > > > > > Each broker keeps > > > 1. a timestamp index map - Map[TopicPartitionSegment, Map[Timestamp, > > > Offset]]. The timestamp is on minute boundary. > > > 2. A timestamp index file for each segment. > > > When a broker receives a message (both leader or follower), it checks > if > > > the timestamp index map contains the timestamp for current segment. The > > > broker add the offset to the map and append an entry to the timestamp > > index > > > if the timestamp does not exist. i.e. we only use the index file as a > > > persistent copy of the index timestamp map. > > > > > > When a log segment is deleted, we need to: > > > 1. delete the TopicPartitionKeySegment key in the timestamp index map. > > > 2. delete the timestamp index file > > > > > > This solution assumes we only keep CreateTime in the message. There > are a > > > few trade-offs in this solution: > > > 1. The granularity of search will be per minute. > > > 2. All the timestamp index map has to be in the memory all the time. > > > 3. We need to think about another way to honor log retention time and > > > time-based log rolling. > > > 4. We lose the benefit brought by including LogAppendTime in the > message > > > mentioned earlier. > > > > > > I am not sure whether this solution is necessarily better than indexing > > on > > > LogAppendTime. > > > > > > I will update KIP-33 to explain the solution to index on CreateTime and > > > LogAppendTime respectively and put some more concrete use cases as > well. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > On Mon, Sep 14, 2015 at 9:40 AM, Jiangjie Qin <j...@linkedin.com> > wrote: > > > > > >> Hi Joel, > > >> > > >> Good point about rebuilding index. I agree that having a per message > > >> LogAppendTime might be necessary. About time adjustment, the solution > > >> sounds promising, but it might be better to make it as a follow up of > > the > > >> KIP because it seems a really rare use case. > > >> > > >> I have another thought on how to manage the out of order timestamps. > > >> Maybe we can do the following: > > >> Create a special log compacted topic __timestamp_index similar to > topic, > > >> the key would be (TopicPartition, TimeStamp_Rounded_To_Minute), the > > value > > >> is offset. In memory, we keep a map for each TopicPartition, the value > > is > > >> (timestamp_rounded_to_minute -> smallest_offset_in_the_minute). This > > way we > > >> can search out of order message and make sure no message is missing. > > >> > > >> Thoughts? > > >> > > >> Thanks, > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> On Fri, Sep 11, 2015 at 12:46 PM, Joel Koshy <jjkosh...@gmail.com> > > wrote: > > >> > > >>> Jay had mentioned the scenario of mirror-maker bootstrap which would > > >>> effectively reset the logAppendTimestamps for the bootstrapped data. > > >>> If we don't include logAppendTimestamps in each message there is a > > >>> similar scenario when rebuilding indexes during recovery. So it seems > > >>> it may be worth adding that timestamp to messages. The drawback to > > >>> that is exposing a server-side concept in the protocol (although we > > >>> already do that with offsets). logAppendTimestamp really should be > > >>> decided by the broker so I think the first scenario may have to be > > >>> written off as a gotcha, but the second may be worth addressing (by > > >>> adding it to the message format). > > >>> > > >>> The other point that Jay raised which needs to be addressed (since we > > >>> require monotically increasing timestamps in the index) in the > > >>> proposal is changing time on the server (I'm a little less concerned > > >>> about NTP clock skews than a user explicitly changing the server's > > >>> time - i.e., big clock skews). We would at least want to "set back" > > >>> all the existing timestamps to guarantee non-decreasing timestamps > > >>> with future messages. I'm not sure at this point how best to handle > > >>> that, but we could perhaps have a epoch/base-time (or > time-correction) > > >>> stored in the log directories and base all log index timestamps off > > >>> that base-time (or corrected). So if at any time you determine that > > >>> time has changed backwards you can adjust that base-time without > > >>> having to fix up all the entries. Without knowing the exact diff > > >>> between the previous clock and new clock we cannot adjust the times > > >>> exactly, but we can at least ensure increasing timestamps. > > >>> > > >>> On Fri, Sep 11, 2015 at 10:52 AM, Jiangjie Qin > > >>> <j...@linkedin.com.invalid> wrote: > > >>> > Ewen and Jay, > > >>> > > > >>> > They way I see the LogAppendTime is another format of "offset". It > > >>> serves > > >>> > the following purpose: > > >>> > 1. Locate messages not only by position, but also by time. The > > >>> difference > > >>> > from offset is timestamp is not unique for all messags. > > >>> > 2. Allow broker to manage messages based on time, e.g. retention, > > >>> rolling > > >>> > 3. Provide convenience for user to search message not only by > offset, > > >>> but > > >>> > also by timestamp. > > >>> > > > >>> > For purpose (2) we don't need per message server timestamp. We only > > >>> need > > >>> > per log segment server timestamp and propagate it among brokers. > > >>> > > > >>> > For (1) and (3), we need per message timestamp. Then the question > is > > >>> > whether we should use CreateTime or LogAppendTime? > > >>> > > > >>> > I completely agree that an application timestamp is very useful for > > >>> many > > >>> > use cases. But it seems to me that having Kafka to understand and > > >>> maintain > > >>> > application timestamp is a bit over demanding. So I think there is > > >>> value to > > >>> > pass on CreateTime for application convenience, but I am not sure > it > > >>> can > > >>> > replace LogAppendTime. Managing out-of-order CreateTime is > equivalent > > >>> to > > >>> > allowing producer to send their own offset and ask broker to manage > > the > > >>> > offset for them, It is going to be very hard to maintain and could > > >>> create > > >>> > huge performance/functional issue because of complicated logic. > > >>> > > > >>> > About whether we should expose LogAppendTime to broker, I agree > that > > >>> server > > >>> > timestamp is internal to broker, but isn't offset also an internal > > >>> concept? > > >>> > Arguably it's not provided by producer so consumer application > logic > > >>> does > > >>> > not have to know offset. But user needs to know offset because they > > >>> need to > > >>> > know "where is the message" in the log. LogAppendTime provides the > > >>> answer > > >>> > of "When was the message appended" to the log. So personally I > think > > >>> it is > > >>> > reasonable to expose the LogAppendTime to consumers. > > >>> > > > >>> > I can see some use cases of exposing the LogAppendTime, to name > some: > > >>> > 1. Let's say broker has 7 days of log retention, some application > > >>> wants to > > >>> > reprocess the data in past 3 days. User can simply provide the > > >>> timestamp > > >>> > and start consume. > > >>> > 2. User can easily know lag by time. > > >>> > 3. Cross cluster fail over. This is a more complicated use case, > > there > > >>> are > > >>> > two goals: 1) Not lose message; and 2) do not reconsume tons of > > >>> messages. > > >>> > Only knowing offset of cluster A won't help with finding fail over > > >>> point in > > >>> > cluster B because an offset of a cluster means nothing to another > > >>> cluster. > > >>> > Timestamp however is a good cross cluster reference in this case. > > >>> > > > >>> > Thanks, > > >>> > > > >>> > Jiangjie (Becket) Qin > > >>> > > > >>> > On Thu, Sep 10, 2015 at 9:28 PM, Ewen Cheslack-Postava < > > >>> e...@confluent.io> > > >>> > wrote: > > >>> > > > >>> >> Re: MM preserving timestamps: Yes, this was how I interpreted the > > >>> point in > > >>> >> the KIP and I only raised the issue because it restricts the > > >>> usefulness of > > >>> >> timestamps anytime MM is involved. I agree it's not a deal > breaker, > > >>> but I > > >>> >> wanted to understand exact impact of the change. Some users seem > to > > >>> want to > > >>> >> be able to seek by application-defined timestamps (despite the > many > > >>> obvious > > >>> >> issues involved), and the proposal clearly would not support that > > >>> unless > > >>> >> the timestamps submitted with the produce requests were respected. > > If > > >>> we > > >>> >> ignore client submitted timestamps, then we probably want to try > to > > >>> hide > > >>> >> the timestamps as much as possible in any public interface (e.g. > > never > > >>> >> shows up in any public consumer APIs), but expose it just enough > to > > be > > >>> >> useful for operational purposes. > > >>> >> > > >>> >> Sorry if my devil's advocate position / attempt to map the design > > >>> space led > > >>> >> to some confusion! > > >>> >> > > >>> >> -Ewen > > >>> >> > > >>> >> > > >>> >> On Thu, Sep 10, 2015 at 5:48 PM, Jay Kreps <j...@confluent.io> > > wrote: > > >>> >> > > >>> >> > Ah, I see, I think I misunderstood about MM, it was called out > in > > >>> the > > >>> >> > proposal and I thought you were saying you'd retain the > timestamp > > >>> but I > > >>> >> > think you're calling out that you're not. In that case you do > have > > >>> the > > >>> >> > opposite problem, right? When you add mirroring for a topic all > > >>> that data > > >>> >> > will have a timestamp of now and retention won't be right. Not a > > >>> blocker > > >>> >> > but a bit of a gotcha. > > >>> >> > > > >>> >> > -Jay > > >>> >> > > > >>> >> > > > >>> >> > > > >>> >> > On Thu, Sep 10, 2015 at 5:40 PM, Joel Koshy < > jjkosh...@gmail.com> > > >>> wrote: > > >>> >> > > > >>> >> > > > Don't you see all the same issues you see with > client-defined > > >>> >> > timestamp's > > >>> >> > > > if you let mm control the timestamp as you were proposing? > > That > > >>> means > > >>> >> > > time > > >>> >> > > > > >>> >> > > Actually I don't think that was in the proposal (or was it?). > > >>> i.e., I > > >>> >> > > think it was always supposed to be controlled by the broker > (and > > >>> not > > >>> >> > > MM). > > >>> >> > > > > >>> >> > > > Also, Joel, can you just confirm that you guys have talked > > >>> through > > >>> >> the > > >>> >> > > > whole timestamp thing with the Samza folks at LI? The > reason I > > >>> ask > > >>> >> > about > > >>> >> > > > this is that Samza and Kafka Streams (KIP-28) are both > trying > > >>> to rely > > >>> >> > on > > >>> >> > > > > >>> >> > > We have not. This is a good point - we will follow-up. > > >>> >> > > > > >>> >> > > > WRT your idea of a FollowerFetchRequestI had thought of a > > >>> similar > > >>> >> idea > > >>> >> > > > where we use the leader's timestamps to approximately set > the > > >>> >> > follower's > > >>> >> > > > timestamps. I had thought of just adding a partition > metadata > > >>> request > > >>> >> > > that > > >>> >> > > > would subsume the current offset/time lookup and could be > used > > >>> by the > > >>> >> > > > follower to try to approximately keep their timestamps > kosher. > > >>> It's a > > >>> >> > > > little hacky and doesn't help with MM but it is also maybe > > less > > >>> >> > invasive > > >>> >> > > so > > >>> >> > > > that approach could be viable. > > >>> >> > > > > >>> >> > > That would also work, but perhaps responding with the actual > > >>> leader > > >>> >> > > offset-timestamp entries (corresponding to the fetched > portion) > > >>> would > > >>> >> > > be exact and it should be small as well. Anyway, the main > > >>> motivation > > >>> >> > > in this was to avoid leaking server-side timestamps to the > > >>> >> > > message-format if people think it is worth it so the > > alternatives > > >>> are > > >>> >> > > implementation details. My original instinct was that it also > > >>> avoids a > > >>> >> > > backwards incompatible change (but it does not because we also > > >>> have > > >>> >> > > the relative offset change). > > >>> >> > > > > >>> >> > > Thanks, > > >>> >> > > > > >>> >> > > Joel > > >>> >> > > > > >>> >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> >> > > > On Thu, Sep 10, 2015 at 3:36 PM, Joel Koshy < > > >>> jjkosh...@gmail.com> > > >>> >> > wrote: > > >>> >> > > > > > >>> >> > > >> I just wanted to comment on a few points made earlier in > this > > >>> >> thread: > > >>> >> > > >> > > >>> >> > > >> Concerns on clock skew: at least for the original > proposal's > > >>> scope > > >>> >> > > >> (which was more for honoring retention broker-side) this > > would > > >>> only > > >>> >> be > > >>> >> > > >> an issue when spanning leader movements right? i.e., leader > > >>> >> migration > > >>> >> > > >> latency has to be much less than clock skew for this to be > a > > >>> real > > >>> >> > > >> issue wouldn’t it? > > >>> >> > > >> > > >>> >> > > >> Client timestamp vs broker timestamp: I’m not sure Kafka > > >>> (brokers) > > >>> >> are > > >>> >> > > >> the right place to reason about client-side timestamps > > >>> precisely due > > >>> >> > > >> to the nuances that have been discussed at length in this > > >>> thread. My > > >>> >> > > >> preference would have been to the timestamp (now called > > >>> >> > > >> LogAppendTimestamp) have nothing to do with the > applications. > > >>> Ewen > > >>> >> > > >> raised a valid concern about leaking such > > “private/server-side” > > >>> >> > > >> timestamps into the protocol spec. i.e., it is fine to have > > the > > >>> >> > > >> CreateTime which is expressly client-provided and immutable > > >>> >> > > >> thereafter, but the LogAppendTime is also going part of the > > >>> protocol > > >>> >> > > >> and it would be good to avoid exposure (to client > developers) > > >>> if > > >>> >> > > >> possible. Ok, so here is a slightly different approach > that I > > >>> was > > >>> >> just > > >>> >> > > >> thinking about (and did not think too far so it may not > > work): > > >>> do > > >>> >> not > > >>> >> > > >> add the LogAppendTime to messages. Instead, build the > > >>> time-based > > >>> >> index > > >>> >> > > >> on the server side on message arrival time alone. > Introduce a > > >>> new > > >>> >> > > >> ReplicaFetchRequest/Response pair. ReplicaFetchResponses > will > > >>> also > > >>> >> > > >> include the slice of the time-based index for the follower > > >>> broker. > > >>> >> > > >> This way we can at least keep timestamps aligned across > > >>> brokers for > > >>> >> > > >> retention purposes. We do lose the append timestamp for > > >>> mirroring > > >>> >> > > >> pipelines (which appears to be the case in KIP-32 as well). > > >>> >> > > >> > > >>> >> > > >> Configurable index granularity: We can do this but I’m not > > >>> sure it > > >>> >> is > > >>> >> > > >> very useful and as Jay noted, a major change from the old > > >>> proposal > > >>> >> > > >> linked from the KIP is the sparse time-based index which we > > >>> felt was > > >>> >> > > >> essential to bound memory usage (and having timestamps on > > each > > >>> log > > >>> >> > > >> index entry was probably a big waste since in the common > case > > >>> >> several > > >>> >> > > >> messages span the same timestamp). BTW another benefit of > the > > >>> second > > >>> >> > > >> index is that it makes it easier to roll-back or throw away > > if > > >>> >> > > >> necessary (vs. modifying the existing index format) - > > although > > >>> that > > >>> >> > > >> obviously does not help with rolling back the timestamp > > change > > >>> in > > >>> >> the > > >>> >> > > >> message format, but it is one less thing to worry about. > > >>> >> > > >> > > >>> >> > > >> Versioning: I’m not sure everyone is saying the same thing > > wrt > > >>> the > > >>> >> > > >> scope of this. There is the record format change, but I > also > > >>> think > > >>> >> > > >> this ties into all of the API versioning that we already > have > > >>> in > > >>> >> > > >> Kafka. The current API versioning approach works fine for > > >>> >> > > >> upgrades/downgrades across official Kafka releases, but not > > so > > >>> well > > >>> >> > > >> between releases. (We almost got bitten by this at LinkedIn > > >>> with the > > >>> >> > > >> recent changes to various requests but were able to work > > around > > >>> >> > > >> these.) We can clarify this in the follow-up KIP. > > >>> >> > > >> > > >>> >> > > >> Thanks, > > >>> >> > > >> > > >>> >> > > >> Joel > > >>> >> > > >> > > >>> >> > > >> > > >>> >> > > >> On Thu, Sep 10, 2015 at 3:00 PM, Jiangjie Qin > > >>> >> > <j...@linkedin.com.invalid > > >>> >> > > > > > >>> >> > > >> wrote: > > >>> >> > > >> > Hi Jay, > > >>> >> > > >> > > > >>> >> > > >> > I just changed the KIP title and updated the KIP page. > > >>> >> > > >> > > > >>> >> > > >> > And yes, we are working on a general version control > > >>> proposal to > > >>> >> > make > > >>> >> > > the > > >>> >> > > >> > protocol migration like this more smooth. I will also > > create > > >>> a KIP > > >>> >> > for > > >>> >> > > >> that > > >>> >> > > >> > soon. > > >>> >> > > >> > > > >>> >> > > >> > Thanks, > > >>> >> > > >> > > > >>> >> > > >> > Jiangjie (Becket) Qin > > >>> >> > > >> > > > >>> >> > > >> > > > >>> >> > > >> > On Thu, Sep 10, 2015 at 2:21 PM, Jay Kreps < > > j...@confluent.io > > >>> > > > >>> >> > wrote: > > >>> >> > > >> > > > >>> >> > > >> >> Great, can we change the name to something related to > the > > >>> >> > > >> change--"KIP-31: > > >>> >> > > >> >> Move to relative offsets in compressed message sets". > > >>> >> > > >> >> > > >>> >> > > >> >> Also you had mentioned before you were going to expand > on > > >>> the > > >>> >> > > mechanics > > >>> >> > > >> of > > >>> >> > > >> >> handling these log format changes, right? > > >>> >> > > >> >> > > >>> >> > > >> >> -Jay > > >>> >> > > >> >> > > >>> >> > > >> >> On Thu, Sep 10, 2015 at 12:42 PM, Jiangjie Qin > > >>> >> > > >> <j...@linkedin.com.invalid> > > >>> >> > > >> >> wrote: > > >>> >> > > >> >> > > >>> >> > > >> >> > Neha and Jay, > > >>> >> > > >> >> > > > >>> >> > > >> >> > Thanks a lot for the feedback. Good point about > > splitting > > >>> the > > >>> >> > > >> >> discussion. I > > >>> >> > > >> >> > have split the proposal to three KIPs and it does make > > >>> each > > >>> >> > > discussion > > >>> >> > > >> >> more > > >>> >> > > >> >> > clear: > > >>> >> > > >> >> > KIP-31 - Message format change (Use relative offset) > > >>> >> > > >> >> > KIP-32 - Add CreateTime and LogAppendTime to Kafka > > message > > >>> >> > > >> >> > KIP-33 - Build a time-based log index > > >>> >> > > >> >> > > > >>> >> > > >> >> > KIP-33 can be a follow up KIP for KIP-32, so we can > > >>> discuss > > >>> >> about > > >>> >> > > >> KIP-31 > > >>> >> > > >> >> > and KIP-32 first for now. I will create a separate > > >>> discussion > > >>> >> > > thread > > >>> >> > > >> for > > >>> >> > > >> >> > KIP-32 and reply the concerns you raised regarding the > > >>> >> timestamp. > > >>> >> > > >> >> > > > >>> >> > > >> >> > So far it looks there is no objection to KIP-31. > Since I > > >>> >> removed > > >>> >> > a > > >>> >> > > few > > >>> >> > > >> >> part > > >>> >> > > >> >> > from previous KIP and only left the relative offset > > >>> proposal, > > >>> >> it > > >>> >> > > >> would be > > >>> >> > > >> >> > great if people can take another look to see if there > is > > >>> any > > >>> >> > > concerns. > > >>> >> > > >> >> > > > >>> >> > > >> >> > Thanks, > > >>> >> > > >> >> > > > >>> >> > > >> >> > Jiangjie (Becket) Qin > > >>> >> > > >> >> > > > >>> >> > > >> >> > > > >>> >> > > >> >> > On Tue, Sep 8, 2015 at 1:28 PM, Neha Narkhede < > > >>> >> n...@confluent.io > > >>> >> > > > > >>> >> > > >> wrote: > > >>> >> > > >> >> > > > >>> >> > > >> >> > > Becket, > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > Nice write-up. Few thoughts - > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > I'd split up the discussion for simplicity. Note > that > > >>> you can > > >>> >> > > always > > >>> >> > > >> >> > group > > >>> >> > > >> >> > > several of these in one patch to reduce the protocol > > >>> changes > > >>> >> > > people > > >>> >> > > >> >> have > > >>> >> > > >> >> > to > > >>> >> > > >> >> > > deal with.This is just a suggestion, but I think the > > >>> >> following > > >>> >> > > split > > >>> >> > > >> >> > might > > >>> >> > > >> >> > > make it easier to tackle the changes being proposed > - > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > - Relative offsets > > >>> >> > > >> >> > > - Introducing the concept of time > > >>> >> > > >> >> > > - Time-based indexing (separate the usage of the > > >>> timestamp > > >>> >> > > field > > >>> >> > > >> >> from > > >>> >> > > >> >> > > how/whether we want to include a timestamp in the > > >>> message) > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > I'm a +1 on relative offsets, we should've done it > > back > > >>> when > > >>> >> we > > >>> >> > > >> >> > introduced > > >>> >> > > >> >> > > it. Other than reducing the CPU overhead, this will > > also > > >>> >> reduce > > >>> >> > > the > > >>> >> > > >> >> > garbage > > >>> >> > > >> >> > > collection overhead on the brokers. > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > On the timestamp field, I generally agree that we > > >>> should add > > >>> >> a > > >>> >> > > >> >> timestamp > > >>> >> > > >> >> > to > > >>> >> > > >> >> > > a Kafka message but I'm not quite sold on how this > KIP > > >>> >> suggests > > >>> >> > > the > > >>> >> > > >> >> > > timestamp be set. Will avoid repeating the downsides > > of > > >>> a > > >>> >> > broker > > >>> >> > > >> side > > >>> >> > > >> >> > > timestamp mentioned previously in this thread. I > think > > >>> the > > >>> >> > topic > > >>> >> > > of > > >>> >> > > >> >> > > including a timestamp in a Kafka message requires a > > lot > > >>> more > > >>> >> > > thought > > >>> >> > > >> >> and > > >>> >> > > >> >> > > details than what's in this KIP. I'd suggest we make > > it > > >>> a > > >>> >> > > separate > > >>> >> > > >> KIP > > >>> >> > > >> >> > that > > >>> >> > > >> >> > > includes a list of all the different use cases for > the > > >>> >> > timestamp > > >>> >> > > >> >> (beyond > > >>> >> > > >> >> > > log retention) including stream processing and > discuss > > >>> >> > tradeoffs > > >>> >> > > of > > >>> >> > > >> >> > > including client and broker side timestamps. > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > Agree with the benefit of time-based indexing, but > > >>> haven't > > >>> >> had > > >>> >> > a > > >>> >> > > >> chance > > >>> >> > > >> >> > to > > >>> >> > > >> >> > > dive into the design details yet. > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > Thanks, > > >>> >> > > >> >> > > Neha > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps < > > >>> j...@confluent.io > > >>> >> > > > >>> >> > > >> wrote: > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > > Hey Beckett, > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > I was proposing splitting up the KIP just for > > >>> simplicity of > > >>> >> > > >> >> discussion. > > >>> >> > > >> >> > > You > > >>> >> > > >> >> > > > can still implement them in one patch. I think > > >>> otherwise it > > >>> >> > > will > > >>> >> > > >> be > > >>> >> > > >> >> > hard > > >>> >> > > >> >> > > to > > >>> >> > > >> >> > > > discuss/vote on them since if you like the offset > > >>> proposal > > >>> >> > but > > >>> >> > > not > > >>> >> > > >> >> the > > >>> >> > > >> >> > > time > > >>> >> > > >> >> > > > proposal what do you do? > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > Introducing a second notion of time into Kafka is > a > > >>> pretty > > >>> >> > > massive > > >>> >> > > >> >> > > > philosophical change so it kind of warrants it's > own > > >>> KIP I > > >>> >> > > think > > >>> >> > > >> it > > >>> >> > > >> >> > isn't > > >>> >> > > >> >> > > > just "Change message format". > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > WRT time I think one thing to clarify in the > > proposal > > >>> is > > >>> >> how > > >>> >> > MM > > >>> >> > > >> will > > >>> >> > > >> >> > have > > >>> >> > > >> >> > > > access to set the timestamp? Presumably this will > be > > >>> a new > > >>> >> > > field > > >>> >> > > >> in > > >>> >> > > >> >> > > > ProducerRecord, right? If so then any user can set > > the > > >>> >> > > timestamp, > > >>> >> > > >> >> > right? > > >>> >> > > >> >> > > > I'm not sure you answered the questions around how > > >>> this > > >>> >> will > > >>> >> > > work > > >>> >> > > >> for > > >>> >> > > >> >> > MM > > >>> >> > > >> >> > > > since when MM retains timestamps from multiple > > >>> partitions > > >>> >> > they > > >>> >> > > >> will > > >>> >> > > >> >> > then > > >>> >> > > >> >> > > be > > >>> >> > > >> >> > > > out of order and in the past (so the > > >>> >> > max(lastAppendedTimestamp, > > >>> >> > > >> >> > > > currentTimeMillis) override you proposed will not > > >>> work, > > >>> >> > > right?). > > >>> >> > > >> If > > >>> >> > > >> >> we > > >>> >> > > >> >> > > > don't do this then when you set up mirroring the > > data > > >>> will > > >>> >> > all > > >>> >> > > be > > >>> >> > > >> new > > >>> >> > > >> >> > and > > >>> >> > > >> >> > > > you have the same retention problem you described. > > >>> Maybe I > > >>> >> > > missed > > >>> >> > > >> >> > > > something...? > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > My main motivation is that given that both Samza > and > > >>> Kafka > > >>> >> > > streams > > >>> >> > > >> >> are > > >>> >> > > >> >> > > > doing work that implies a mandatory client-defined > > >>> notion > > >>> >> of > > >>> >> > > >> time, I > > >>> >> > > >> >> > > really > > >>> >> > > >> >> > > > think introducing a different mandatory notion of > > >>> time in > > >>> >> > > Kafka is > > >>> >> > > >> >> > going > > >>> >> > > >> >> > > to > > >>> >> > > >> >> > > > be quite odd. We should think hard about how > > >>> client-defined > > >>> >> > > time > > >>> >> > > >> >> could > > >>> >> > > >> >> > > > work. I'm not sure if it can, but I'm also not > sure > > >>> that it > > >>> >> > > can't. > > >>> >> > > >> >> > Having > > >>> >> > > >> >> > > > both will be odd. Did you chat about this with > > >>> Yi/Kartik on > > >>> >> > the > > >>> >> > > >> Samza > > >>> >> > > >> >> > > side? > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > When you are saying it won't work you are assuming > > >>> some > > >>> >> > > particular > > >>> >> > > >> >> > > > implementation? Maybe that the index is a > > >>> monotonically > > >>> >> > > increasing > > >>> >> > > >> >> set > > >>> >> > > >> >> > of > > >>> >> > > >> >> > > > pointers to the least record with a timestamp > larger > > >>> than > > >>> >> the > > >>> >> > > >> index > > >>> >> > > >> >> > time? > > >>> >> > > >> >> > > > In other words a search for time X gives the > largest > > >>> offset > > >>> >> > at > > >>> >> > > >> which > > >>> >> > > >> >> > all > > >>> >> > > >> >> > > > records are <= X? > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > For retention, I agree with the problem you point > > >>> out, but > > >>> >> I > > >>> >> > > think > > >>> >> > > >> >> what > > >>> >> > > >> >> > > you > > >>> >> > > >> >> > > > are saying in that case is that you want a size > > limit > > >>> too. > > >>> >> If > > >>> >> > > you > > >>> >> > > >> use > > >>> >> > > >> >> > > > system time you actually hit the same problem: say > > >>> you do a > > >>> >> > > full > > >>> >> > > >> dump > > >>> >> > > >> >> > of > > >>> >> > > >> >> > > a > > >>> >> > > >> >> > > > DB table with a setting of 7 days retention, your > > >>> retention > > >>> >> > > will > > >>> >> > > >> >> > actually > > >>> >> > > >> >> > > > not get enforced for the first 7 days because the > > >>> data is > > >>> >> > "new > > >>> >> > > to > > >>> >> > > >> >> > Kafka". > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > -Jay > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > On Mon, Sep 7, 2015 at 10:44 AM, Jiangjie Qin > > >>> >> > > >> >> > <j...@linkedin.com.invalid > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > wrote: > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > > Jay, > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > Thanks for the comments. Yes, there are actually > > >>> three > > >>> >> > > >> proposals as > > >>> >> > > >> >> > you > > >>> >> > > >> >> > > > > pointed out. > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > We will have a separate proposal for (1) - > version > > >>> >> control > > >>> >> > > >> >> mechanism. > > >>> >> > > >> >> > > We > > >>> >> > > >> >> > > > > actually thought about whether we want to > separate > > >>> 2 and > > >>> >> 3 > > >>> >> > > >> >> internally > > >>> >> > > >> >> > > > > before creating the KIP. The reason we put 2 > and 3 > > >>> >> together > > >>> >> > > is > > >>> >> > > >> it > > >>> >> > > >> >> > will > > >>> >> > > >> >> > > > > saves us another cross board wire protocol > change. > > >>> Like > > >>> >> you > > >>> >> > > >> said, > > >>> >> > > >> >> we > > >>> >> > > >> >> > > have > > >>> >> > > >> >> > > > > to migrate all the clients in all languages. To > > some > > >>> >> > extent, > > >>> >> > > the > > >>> >> > > >> >> > effort > > >>> >> > > >> >> > > > to > > >>> >> > > >> >> > > > > spend on upgrading the clients can be even > bigger > > >>> than > > >>> >> > > >> implementing > > >>> >> > > >> >> > the > > >>> >> > > >> >> > > > new > > >>> >> > > >> >> > > > > feature itself. So there are some attractions if > > we > > >>> can > > >>> >> do > > >>> >> > 2 > > >>> >> > > >> and 3 > > >>> >> > > >> >> > > > together > > >>> >> > > >> >> > > > > instead of separately. Maybe after (1) is done > it > > >>> will be > > >>> >> > > >> easier to > > >>> >> > > >> >> > do > > >>> >> > > >> >> > > > > protocol migration. But if we are able to come > to > > an > > >>> >> > > agreement > > >>> >> > > >> on > > >>> >> > > >> >> the > > >>> >> > > >> >> > > > > timestamp solution, I would prefer to have it > > >>> together > > >>> >> with > > >>> >> > > >> >> relative > > >>> >> > > >> >> > > > offset > > >>> >> > > >> >> > > > > in the interest of avoiding another wire > protocol > > >>> change > > >>> >> > (the > > >>> >> > > >> >> process > > >>> >> > > >> >> > > to > > >>> >> > > >> >> > > > > migrate to relative offset is exactly the same > as > > >>> migrate > > >>> >> > to > > >>> >> > > >> >> message > > >>> >> > > >> >> > > with > > >>> >> > > >> >> > > > > timestamp). > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > In terms of timestamp. I completely agree that > > >>> having > > >>> >> > client > > >>> >> > > >> >> > timestamp > > >>> >> > > >> >> > > is > > >>> >> > > >> >> > > > > more useful if we can make sure the timestamp is > > >>> good. > > >>> >> But > > >>> >> > in > > >>> >> > > >> >> reality > > >>> >> > > >> >> > > > that > > >>> >> > > >> >> > > > > can be a really big *IF*. I think the problem is > > >>> exactly > > >>> >> as > > >>> >> > > Ewen > > >>> >> > > >> >> > > > mentioned, > > >>> >> > > >> >> > > > > if we let the client to set the timestamp, it > > would > > >>> be > > >>> >> very > > >>> >> > > hard > > >>> >> > > >> >> for > > >>> >> > > >> >> > > the > > >>> >> > > >> >> > > > > broker to utilize it. If broker apply retention > > >>> policy > > >>> >> > based > > >>> >> > > on > > >>> >> > > >> the > > >>> >> > > >> >> > > > client > > >>> >> > > >> >> > > > > timestamp. One misbehave producer can > potentially > > >>> >> > completely > > >>> >> > > >> mess > > >>> >> > > >> >> up > > >>> >> > > >> >> > > the > > >>> >> > > >> >> > > > > retention policy on the broker. Although people > > >>> don't > > >>> >> care > > >>> >> > > about > > >>> >> > > >> >> > server > > >>> >> > > >> >> > > > > side timestamp. People do care a lot when > > timestamp > > >>> >> breaks. > > >>> >> > > >> >> Searching > > >>> >> > > >> >> > > by > > >>> >> > > >> >> > > > > timestamp is a really important use case even > > >>> though it > > >>> >> is > > >>> >> > > not > > >>> >> > > >> used > > >>> >> > > >> >> > as > > >>> >> > > >> >> > > > > often as searching by offset. It has significant > > >>> direct > > >>> >> > > impact > > >>> >> > > >> on > > >>> >> > > >> >> RTO > > >>> >> > > >> >> > > > when > > >>> >> > > >> >> > > > > there is a cross cluster failover as Todd > > mentioned. > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > The trick using max(lastAppendedTimestamp, > > >>> >> > currentTimeMillis) > > >>> >> > > >> is to > > >>> >> > > >> >> > > > > guarantee monotonic increase of the timestamp. > > Many > > >>> >> > > commercial > > >>> >> > > >> >> system > > >>> >> > > >> >> > > > > actually do something similar to this to solve > the > > >>> time > > >>> >> > skew. > > >>> >> > > >> About > > >>> >> > > >> >> > > > > changing the time, I am not sure if people use > NTP > > >>> like > > >>> >> > > using a > > >>> >> > > >> >> watch > > >>> >> > > >> >> > > to > > >>> >> > > >> >> > > > > just set it forward/backward by an hour or so. > The > > >>> time > > >>> >> > > >> adjustment > > >>> >> > > >> >> I > > >>> >> > > >> >> > > used > > >>> >> > > >> >> > > > > to do is typically to adjust something like a > > >>> minute / > > >>> >> > > week. So > > >>> >> > > >> >> for > > >>> >> > > >> >> > > each > > >>> >> > > >> >> > > > > second, there might be a few mircoseconds > > >>> slower/faster > > >>> >> but > > >>> >> > > >> should > > >>> >> > > >> >> > not > > >>> >> > > >> >> > > > > break the clock completely to make sure all the > > >>> >> time-based > > >>> >> > > >> >> > transactions > > >>> >> > > >> >> > > > are > > >>> >> > > >> >> > > > > not affected. The one minute change will be done > > >>> within a > > >>> >> > > week > > >>> >> > > >> but > > >>> >> > > >> >> > not > > >>> >> > > >> >> > > > > instantly. > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > Personally, I think having client side timestamp > > >>> will be > > >>> >> > > useful > > >>> >> > > >> if > > >>> >> > > >> >> we > > >>> >> > > >> >> > > > don't > > >>> >> > > >> >> > > > > need to put the broker and data integrity under > > >>> risk. If > > >>> >> we > > >>> >> > > >> have to > > >>> >> > > >> >> > > > choose > > >>> >> > > >> >> > > > > from one of them but not both. I would prefer > > >>> server side > > >>> >> > > >> timestamp > > >>> >> > > >> >> > > > because > > >>> >> > > >> >> > > > > for client side timestamp there is always a > plan B > > >>> which > > >>> >> is > > >>> >> > > >> putting > > >>> >> > > >> >> > the > > >>> >> > > >> >> > > > > timestamp into payload. > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > Another reason I am reluctant to use the client > > side > > >>> >> > > timestamp > > >>> >> > > >> is > > >>> >> > > >> >> > that > > >>> >> > > >> >> > > it > > >>> >> > > >> >> > > > > is always dangerous to mix the control plane > with > > >>> data > > >>> >> > > plane. IP > > >>> >> > > >> >> did > > >>> >> > > >> >> > > this > > >>> >> > > >> >> > > > > and it has caused so many different breaches so > > >>> people > > >>> >> are > > >>> >> > > >> >> migrating > > >>> >> > > >> >> > to > > >>> >> > > >> >> > > > > something like MPLS. An example in Kafka is that > > any > > >>> >> client > > >>> >> > > can > > >>> >> > > >> >> > > > construct a > > >>> >> > > >> >> > > > > > > >>> >> > > >> > > >>> LeaderAndIsrRequest/UpdateMetadataRequest/ContorlledShutdownRequest > > >>> >> > > >> >> > > (you > > >>> >> > > >> >> > > > > name it) and send it to the broker to mess up > the > > >>> entire > > >>> >> > > >> cluster, > > >>> >> > > >> >> > also > > >>> >> > > >> >> > > as > > >>> >> > > >> >> > > > > we already noticed a busy cluster can respond > > quite > > >>> slow > > >>> >> to > > >>> >> > > >> >> > controller > > >>> >> > > >> >> > > > > messages. So it would really be nice if we can > > avoid > > >>> >> giving > > >>> >> > > the > > >>> >> > > >> >> power > > >>> >> > > >> >> > > to > > >>> >> > > >> >> > > > > clients to control the log retention. > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > Thanks, > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > Jiangjie (Becket) Qin > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > On Sun, Sep 6, 2015 at 9:54 PM, Todd Palino < > > >>> >> > > tpal...@gmail.com> > > >>> >> > > >> >> > wrote: > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > > So, with regards to why you want to search by > > >>> >> timestamp, > > >>> >> > > the > > >>> >> > > >> >> > biggest > > >>> >> > > >> >> > > > > > problem I've seen is with consumers who want > to > > >>> reset > > >>> >> > their > > >>> >> > > >> >> > > timestamps > > >>> >> > > >> >> > > > > to a > > >>> >> > > >> >> > > > > > specific point, whether it is to replay a > > certain > > >>> >> amount > > >>> >> > of > > >>> >> > > >> >> > messages, > > >>> >> > > >> >> > > > or > > >>> >> > > >> >> > > > > to > > >>> >> > > >> >> > > > > > rewind to before some problem state existed. > > This > > >>> >> happens > > >>> >> > > more > > >>> >> > > >> >> > often > > >>> >> > > >> >> > > > than > > >>> >> > > >> >> > > > > > anyone would like. > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > To handle this now we need to constantly > export > > >>> the > > >>> >> > > broker's > > >>> >> > > >> >> offset > > >>> >> > > >> >> > > for > > >>> >> > > >> >> > > > > > every partition to a time-series database and > > >>> then use > > >>> >> > > >> external > > >>> >> > > >> >> > > > processes > > >>> >> > > >> >> > > > > > to query this. I know we're not the only ones > > >>> doing > > >>> >> this. > > >>> >> > > The > > >>> >> > > >> way > > >>> >> > > >> >> > the > > >>> >> > > >> >> > > > > > broker handles requests for offsets by > timestamp > > >>> is a > > >>> >> > > little > > >>> >> > > >> >> obtuse > > >>> >> > > >> >> > > > > > (explain it to anyone without intimate > knowledge > > >>> of the > > >>> >> > > >> internal > > >>> >> > > >> >> > > > workings > > >>> >> > > >> >> > > > > > of the broker - every time I do I see this). > In > > >>> >> addition, > > >>> >> > > as > > >>> >> > > >> >> Becket > > >>> >> > > >> >> > > > > pointed > > >>> >> > > >> >> > > > > > out, it causes problems specifically with > > >>> retention of > > >>> >> > > >> messages > > >>> >> > > >> >> by > > >>> >> > > >> >> > > time > > >>> >> > > >> >> > > > > > when you move partitions around. > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > I'm deliberately avoiding the discussion of > what > > >>> >> > timestamp > > >>> >> > > to > > >>> >> > > >> >> use. > > >>> >> > > >> >> > I > > >>> >> > > >> >> > > > can > > >>> >> > > >> >> > > > > > see the argument either way, though I tend to > > lean > > >>> >> > towards > > >>> >> > > the > > >>> >> > > >> >> idea > > >>> >> > > >> >> > > > that > > >>> >> > > >> >> > > > > > the broker timestamp is the only viable source > > of > > >>> truth > > >>> >> > in > > >>> >> > > >> this > > >>> >> > > >> >> > > > > situation. > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > -Todd > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > On Sun, Sep 6, 2015 at 7:08 PM, Ewen > > >>> Cheslack-Postava < > > >>> >> > > >> >> > > > e...@confluent.io > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > wrote: > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > > On Sun, Sep 6, 2015 at 4:57 PM, Jay Kreps < > > >>> >> > > j...@confluent.io > > >>> >> > > >> > > > >>> >> > > >> >> > > wrote: > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > 2. Nobody cares what time it is on the > > server. > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > This is a good way of summarizing the issue > I > > >>> was > > >>> >> > trying > > >>> >> > > to > > >>> >> > > >> get > > >>> >> > > >> >> > at, > > >>> >> > > >> >> > > > > from > > >>> >> > > >> >> > > > > > an > > >>> >> > > >> >> > > > > > > app's perspective. Of the 3 stated goals of > > the > > >>> KIP, > > >>> >> #2 > > >>> >> > > (lot > > >>> >> > > >> >> > > > retention) > > >>> >> > > >> >> > > > > > is > > >>> >> > > >> >> > > > > > > reasonably handled by a server-side > > timestamp. I > > >>> >> really > > >>> >> > > just > > >>> >> > > >> >> care > > >>> >> > > >> >> > > > that > > >>> >> > > >> >> > > > > a > > >>> >> > > >> >> > > > > > > message is there long enough that I have a > > >>> chance to > > >>> >> > > process > > >>> >> > > >> >> it. > > >>> >> > > >> >> > #3 > > >>> >> > > >> >> > > > > > > (searching by timestamp) only seems useful > if > > >>> we can > > >>> >> > > >> guarantee > > >>> >> > > >> >> > the > > >>> >> > > >> >> > > > > > > server-side timestamp is close enough to the > > >>> original > > >>> >> > > >> >> client-side > > >>> >> > > >> >> > > > > > > timestamp, and any mirror maker step seems > to > > >>> break > > >>> >> > that > > >>> >> > > >> (even > > >>> >> > > >> >> > > > ignoring > > >>> >> > > >> >> > > > > > any > > >>> >> > > >> >> > > > > > > issues with broker availability). > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > I'm also wondering whether optimizing for > > >>> >> > > >> search-by-timestamp > > >>> >> > > >> >> on > > >>> >> > > >> >> > > the > > >>> >> > > >> >> > > > > > broker > > >>> >> > > >> >> > > > > > > is really something we want to do given that > > >>> messages > > >>> >> > > aren't > > >>> >> > > >> >> > really > > >>> >> > > >> >> > > > > > > guaranteed to be ordered by > application-level > > >>> >> > timestamps > > >>> >> > > on > > >>> >> > > >> the > > >>> >> > > >> >> > > > broker. > > >>> >> > > >> >> > > > > > Is > > >>> >> > > >> >> > > > > > > part of the need for this just due to the > > >>> current > > >>> >> > > consumer > > >>> >> > > >> APIs > > >>> >> > > >> >> > > being > > >>> >> > > >> >> > > > > > > difficult to work with? For example, could > you > > >>> >> > implement > > >>> >> > > >> this > > >>> >> > > >> >> > > pretty > > >>> >> > > >> >> > > > > > easily > > >>> >> > > >> >> > > > > > > client side just the way you would > > broker-side? > > >>> I'd > > >>> >> > > imagine > > >>> >> > > >> a > > >>> >> > > >> >> > > couple > > >>> >> > > >> >> > > > of > > >>> >> > > >> >> > > > > > > random seeks + reads during very rare > > occasions > > >>> (i.e. > > >>> >> > > when > > >>> >> > > >> the > > >>> >> > > >> >> > app > > >>> >> > > >> >> > > > > starts > > >>> >> > > >> >> > > > > > > up) wouldn't be a problem performance-wise. > Or > > >>> is it > > >>> >> > also > > >>> >> > > >> that > > >>> >> > > >> >> > you > > >>> >> > > >> >> > > > need > > >>> >> > > >> >> > > > > > the > > >>> >> > > >> >> > > > > > > broker to enforce things like monotonically > > >>> >> increasing > > >>> >> > > >> >> timestamps > > >>> >> > > >> >> > > > since > > >>> >> > > >> >> > > > > > you > > >>> >> > > >> >> > > > > > > can't do the query properly and efficiently > > >>> without > > >>> >> > that > > >>> >> > > >> >> > guarantee, > > >>> >> > > >> >> > > > and > > >>> >> > > >> >> > > > > > > therefore what applications are actually > > >>> looking for > > >>> >> > *is* > > >>> >> > > >> >> > > broker-side > > >>> >> > > >> >> > > > > > > timestamps? > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > -Ewen > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > Consider cases where data is being copied > > >>> from a > > >>> >> > > database > > >>> >> > > >> or > > >>> >> > > >> >> > from > > >>> >> > > >> >> > > > log > > >>> >> > > >> >> > > > > > > > files. In steady-state the server time is > > very > > >>> >> close > > >>> >> > to > > >>> >> > > >> the > > >>> >> > > >> >> > > client > > >>> >> > > >> >> > > > > time > > >>> >> > > >> >> > > > > > > if > > >>> >> > > >> >> > > > > > > > their clocks are sync'd (see 1) but there > > >>> will be > > >>> >> > > times of > > >>> >> > > >> >> > large > > >>> >> > > >> >> > > > > > > divergence > > >>> >> > > >> >> > > > > > > > when the copying process is stopped or > falls > > >>> >> behind. > > >>> >> > > When > > >>> >> > > >> >> this > > >>> >> > > >> >> > > > occurs > > >>> >> > > >> >> > > > > > it > > >>> >> > > >> >> > > > > > > is > > >>> >> > > >> >> > > > > > > > clear that the time the data arrived on > the > > >>> server > > >>> >> is > > >>> >> > > >> >> > irrelevant, > > >>> >> > > >> >> > > > it > > >>> >> > > >> >> > > > > is > > >>> >> > > >> >> > > > > > > the > > >>> >> > > >> >> > > > > > > > source timestamp that matters. This is the > > >>> problem > > >>> >> > you > > >>> >> > > are > > >>> >> > > >> >> > trying > > >>> >> > > >> >> > > > to > > >>> >> > > >> >> > > > > > fix > > >>> >> > > >> >> > > > > > > by > > >>> >> > > >> >> > > > > > > > retaining the mm timestamp but really the > > >>> client > > >>> >> > should > > >>> >> > > >> >> always > > >>> >> > > >> >> > > set > > >>> >> > > >> >> > > > > the > > >>> >> > > >> >> > > > > > > time > > >>> >> > > >> >> > > > > > > > with the use of server-side time as a > > >>> fallback. It > > >>> >> > > would > > >>> >> > > >> be > > >>> >> > > >> >> > worth > > >>> >> > > >> >> > > > > > talking > > >>> >> > > >> >> > > > > > > > to the Samza folks and reading through > this > > >>> blog > > >>> >> > post ( > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > > >>> >> > > >> >> > > >>> >> > > >> > > >>> >> > > > > >>> >> > > > >>> >> > > >>> > > > http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101.html > > >>> >> > > >> >> > > > > > > > ) > > >>> >> > > >> >> > > > > > > > on this subject since we went through > > similar > > >>> >> > > learnings on > > >>> >> > > >> >> the > > >>> >> > > >> >> > > > stream > > >>> >> > > >> >> > > > > > > > processing side. > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > I think the implication of these two is > that > > >>> we > > >>> >> need > > >>> >> > a > > >>> >> > > >> >> proposal > > >>> >> > > >> >> > > > that > > >>> >> > > >> >> > > > > > > > handles potentially very out-of-order > > >>> timestamps in > > >>> >> > > some > > >>> >> > > >> kind > > >>> >> > > >> >> > of > > >>> >> > > >> >> > > > > sanish > > >>> >> > > >> >> > > > > > > way > > >>> >> > > >> >> > > > > > > > (buggy clients will set something totally > > >>> wrong as > > >>> >> > the > > >>> >> > > >> time). > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > -Jay > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > On Sun, Sep 6, 2015 at 4:22 PM, Jay Kreps > < > > >>> >> > > >> j...@confluent.io> > > >>> >> > > >> >> > > > wrote: > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > > The magic byte is used to version > message > > >>> format > > >>> >> so > > >>> >> > > >> we'll > > >>> >> > > >> >> > need > > >>> >> > > >> >> > > to > > >>> >> > > >> >> > > > > > make > > >>> >> > > >> >> > > > > > > > > sure that check is in place--I actually > > >>> don't see > > >>> >> > it > > >>> >> > > in > > >>> >> > > >> the > > >>> >> > > >> >> > > > current > > >>> >> > > >> >> > > > > > > > > consumer code which I think is a bug we > > >>> should > > >>> >> fix > > >>> >> > > for > > >>> >> > > >> the > > >>> >> > > >> >> > next > > >>> >> > > >> >> > > > > > release > > >>> >> > > >> >> > > > > > > > > (filed KAFKA-2523). The purpose of that > > >>> field is > > >>> >> so > > >>> >> > > >> there > > >>> >> > > >> >> is > > >>> >> > > >> >> > a > > >>> >> > > >> >> > > > > clear > > >>> >> > > >> >> > > > > > > > check > > >>> >> > > >> >> > > > > > > > > on the format rather than the scrambled > > >>> scenarios > > >>> >> > > Becket > > >>> >> > > >> >> > > > describes. > > >>> >> > > >> >> > > > > > > > > > > >>> >> > > >> >> > > > > > > > > Also, Becket, I don't think just fixing > > the > > >>> java > > >>> >> > > client > > >>> >> > > >> is > > >>> >> > > >> >> > > > > sufficient > > >>> >> > > >> >> > > > > > > as > > >>> >> > > >> >> > > > > > > > > that would break other clients--i.e. if > > >>> anyone > > >>> >> > > writes a > > >>> >> > > >> v1 > > >>> >> > > >> >> > > > > messages, > > >>> >> > > >> >> > > > > > > even > > >>> >> > > >> >> > > > > > > > > by accident, any non-v1-capable consumer > > >>> will > > >>> >> > break. > > >>> >> > > I > > >>> >> > > >> >> think > > >>> >> > > >> >> > we > > >>> >> > > >> >> > > > > > > probably > > >>> >> > > >> >> > > > > > > > > need a way to have the server ensure a > > >>> particular > > >>> >> > > >> message > > >>> >> > > >> >> > > format > > >>> >> > > >> >> > > > > > either > > >>> >> > > >> >> > > > > > > > at > > >>> >> > > >> >> > > > > > > > > read or write time. > > >>> >> > > >> >> > > > > > > > > > > >>> >> > > >> >> > > > > > > > > -Jay > > >>> >> > > >> >> > > > > > > > > > > >>> >> > > >> >> > > > > > > > > On Thu, Sep 3, 2015 at 3:47 PM, Jiangjie > > Qin > > >>> >> > > >> >> > > > > > <j...@linkedin.com.invalid > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > > wrote: > > >>> >> > > >> >> > > > > > > > > > > >>> >> > > >> >> > > > > > > > >> Hi Guozhang, > > >>> >> > > >> >> > > > > > > > >> > > >>> >> > > >> >> > > > > > > > >> I checked the code again. Actually CRC > > >>> check > > >>> >> > > probably > > >>> >> > > >> >> won't > > >>> >> > > >> >> > > > fail. > > >>> >> > > >> >> > > > > > The > > >>> >> > > >> >> > > > > > > > >> newly > > >>> >> > > >> >> > > > > > > > >> added timestamp field might be treated > as > > >>> >> > keyLength > > >>> >> > > >> >> instead, > > >>> >> > > >> >> > > so > > >>> >> > > >> >> > > > we > > >>> >> > > >> >> > > > > > are > > >>> >> > > >> >> > > > > > > > >> likely to receive an > > >>> IllegalArgumentException > > >>> >> when > > >>> >> > > try > > >>> >> > > >> to > > >>> >> > > >> >> > read > > >>> >> > > >> >> > > > the > > >>> >> > > >> >> > > > > > > key. > > >>> >> > > >> >> > > > > > > > >> I'll update the KIP. > > >>> >> > > >> >> > > > > > > > >> > > >>> >> > > >> >> > > > > > > > >> Thanks, > > >>> >> > > >> >> > > > > > > > >> > > >>> >> > > >> >> > > > > > > > >> Jiangjie (Becket) Qin > > >>> >> > > >> >> > > > > > > > >> > > >>> >> > > >> >> > > > > > > > >> On Thu, Sep 3, 2015 at 12:48 PM, > Jiangjie > > >>> Qin < > > >>> >> > > >> >> > > > j...@linkedin.com> > > >>> >> > > >> >> > > > > > > > wrote: > > >>> >> > > >> >> > > > > > > > >> > > >>> >> > > >> >> > > > > > > > >> > Hi, Guozhang, > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> > Thanks for reading the KIP. By "old > > >>> >> consumer", I > > >>> >> > > >> meant > > >>> >> > > >> >> the > > >>> >> > > >> >> > > > > > > > >> > ZookeeperConsumerConnector in trunk > > now, > > >>> i.e. > > >>> >> > > without > > >>> >> > > >> >> this > > >>> >> > > >> >> > > bug > > >>> >> > > >> >> > > > > > > fixed. > > >>> >> > > >> >> > > > > > > > >> If we > > >>> >> > > >> >> > > > > > > > >> > fix the ZookeeperConsumerConnector > then > > >>> it > > >>> >> will > > >>> >> > > throw > > >>> >> > > >> >> > > > exception > > >>> >> > > >> >> > > > > > > > >> complaining > > >>> >> > > >> >> > > > > > > > >> > about the unsupported version when it > > >>> sees > > >>> >> > message > > >>> >> > > >> >> format > > >>> >> > > >> >> > > V1. > > >>> >> > > >> >> > > > > > What I > > >>> >> > > >> >> > > > > > > > was > > >>> >> > > >> >> > > > > > > > >> > trying to say is that if we have some > > >>> >> > > >> >> > > > ZookeeperConsumerConnector > > >>> >> > > >> >> > > > > > > > running > > >>> >> > > >> >> > > > > > > > >> > without the fix, the consumer will > > >>> complain > > >>> >> > about > > >>> >> > > CRC > > >>> >> > > >> >> > > mismatch > > >>> >> > > >> >> > > > > > > instead > > >>> >> > > >> >> > > > > > > > >> of > > >>> >> > > >> >> > > > > > > > >> > unsupported version. > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> > Thanks, > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> > Jiangjie (Becket) Qin > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> > On Thu, Sep 3, 2015 at 12:15 PM, > > Guozhang > > >>> >> Wang < > > >>> >> > > >> >> > > > > > wangg...@gmail.com> > > >>> >> > > >> >> > > > > > > > >> wrote: > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> >> Thanks for the write-up Jiangjie. > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> One comment about migration plan: > "For > > >>> old > > >>> >> > > >> consumers, > > >>> >> > > >> >> if > > >>> >> > > >> >> > > they > > >>> >> > > >> >> > > > > see > > >>> >> > > >> >> > > > > > > the > > >>> >> > > >> >> > > > > > > > >> new > > >>> >> > > >> >> > > > > > > > >> >> protocol the CRC check will fail".. > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> Do you mean this bug in the old > > consumer > > >>> >> cannot > > >>> >> > > be > > >>> >> > > >> >> fixed > > >>> >> > > >> >> > > in a > > >>> >> > > >> >> > > > > > > > >> >> backward-compatible way? > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> Guozhang > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> On Thu, Sep 3, 2015 at 8:35 AM, > > >>> Jiangjie Qin > > >>> >> > > >> >> > > > > > > > <j...@linkedin.com.invalid > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> >> wrote: > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> > Hi, > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > We just created KIP-31 to propose > a > > >>> message > > >>> >> > > format > > >>> >> > > >> >> > change > > >>> >> > > >> >> > > > in > > >>> >> > > >> >> > > > > > > Kafka. > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > > >>> >> > > >> >> > > >>> >> > > >> > > >>> >> > > > > >>> >> > > > >>> >> > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Message+format+change+proposal > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > As a summary, the motivations are: > > >>> >> > > >> >> > > > > > > > >> >> > 1. Avoid server side message > > >>> re-compression > > >>> >> > > >> >> > > > > > > > >> >> > 2. Honor time-based log roll and > > >>> retention > > >>> >> > > >> >> > > > > > > > >> >> > 3. Enable offset search by > timestamp > > >>> at a > > >>> >> > finer > > >>> >> > > >> >> > > > granularity. > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > Feedback and comments are welcome! > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > Thanks, > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > Jiangjie (Becket) Qin > > >>> >> > > >> >> > > > > > > > >> >> > > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> >> -- > > >>> >> > > >> >> > > > > > > > >> >> -- Guozhang > > >>> >> > > >> >> > > > > > > > >> >> > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> > > > >>> >> > > >> >> > > > > > > > >> > > >>> >> > > >> >> > > > > > > > > > > >>> >> > > >> >> > > > > > > > > > > >>> >> > > >> >> > > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > -- > > >>> >> > > >> >> > > > > > > Thanks, > > >>> >> > > >> >> > > > > > > Ewen > > >>> >> > > >> >> > > > > > > > > >>> >> > > >> >> > > > > > > > >>> >> > > >> >> > > > > > > >>> >> > > >> >> > > > > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > -- > > >>> >> > > >> >> > > Thanks, > > >>> >> > > >> >> > > Neha > > >>> >> > > >> >> > > > > >>> >> > > >> >> > > > >>> >> > > >> >> > > >>> >> > > >> > > >>> >> > > > > >>> >> > > > >>> >> > > >>> >> > > >>> >> > > >>> >> -- > > >>> >> Thanks, > > >>> >> Ewen > > >>> >> > > >>> > > >>> > > >> > > > > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >