Jiangjie, Good point on the time index format related to uncompressed messages. It does seem that indexing based on file position requires a bit more complexity. Since the time index is going to be used infrequently, having a level of indirection doesn't seem a big concern. So, we can leave the logic as it is.
Do you plan to submit a patch to fix the time-based rolling issue? Thanks, Jun On Fri, Aug 26, 2016 at 3:23 PM, Becket Qin <becket....@gmail.com> wrote: > Jun, > > Good point about new log rolling behavior issue when move replicas. Keeping > the old behavior sounds reasonable to me. > > Currently the time index entry points to the exact shallow message with the > indexed timestamp, are you suggesting we change it to point to the starting > offset of the appended batch(message set)? That doesn't seem to work for > truncation. For example, imagine an uncompressed message set [(m1, > offset=100, timestamp=100, position=1000), (m2, offset=101,timestamp=105, > position=1100)], if we build the time index based on the starting offset of > this message set, the index entry would be (105, 1000), later on when log > is truncated, and m2 is truncated but m1 is not (this is possible for a > uncompressed message set) in this case, we will not delete the time index > entry because it is technically pointing to m1. Pointing to the end of the > batch will not work either because then search by timestamp would miss m2. > > I am not sure if it is worth doing, but if we are willing to change the > semantic to let the time index entry not point to the exact shallow > message. I am thinking maybe we should just switch the semantic to the very > original one, i.e. time index only means "Max timestamp up util this > offset", which is also aligned with offset index entry. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Aug 26, 2016 at 10:29 AM, Jun Rao <j...@confluent.io> wrote: > > > Jiangjie, > > > > I am not sure about changing the default to LogAppendTime since > CreateTime > > is probably what most people want. It also doesn't solve the problem > > completely. For example, if you do partition reassignment and need to > copy > > a bunch of old log segments to a new broker, this may cause log rolling > on > > every message. > > > > Another alternative is to just keep the old time rolling behavior, which > is > > rolling based on the create time of the log segment. I had two use cases > of > > time-based rolling in mind. The first one is for users who don't want to > > retain a message (say sensitive data) in the log for too long. For this, > > one can set a time-based retention. If the log can roll periodically > based > > on create time, it will freeze the largest timestamp in the rolled > segment > > and cause it to be deleted when the time limit has been reached. Rolling > > based on the timestamp of the first message doesn't help much here since > > the retention is always based on the largest timestamp. The second one is > > for log cleaner to happen quicker. Rolling logs periodically based on > > create time will also work. So, it seems that if we preserve the old time > > rolling behavior, we won't lose much functionality, but will avoid the > > corner case where the logs could be rolled on every message. What do you > > think? > > > > About storing file position in the time index, I don't think it needs to > > incur overhead during append. At the beginning of append, we are already > > getting the end position of the log (for maintaining the offset index). > We > > can just keep track of that together with the last seen offset. Storing > the > > position has the slight benefit that it avoids another indirection and > > seems more consistent with the offset index. It's worth thinking through > > whether this is better. If we want to change it, it's better to change it > > now than later. > > > > Thanks, > > > > Jun > > > > On Thu, Aug 25, 2016 at 6:30 PM, Becket Qin <becket....@gmail.com> > wrote: > > > > > Hi Jan, > > > > > > It seems your main concern is for the changed behavior of time based > log > > > rolling and time based retention. That is actually why we have two > > > timestamp types. If user set the log.message.timestamp.type to > > > LogAppendTime, the broker will behave exactly the same as they were, > > except > > > the rolling and retention would be more accurate and independent to the > > > replica movements. > > > > > > The log.message.timestam.max.difference.ms is only useful when users > are > > > using CreateTime. It is kind of a protection on the broker because an > > > insanely large timestamp could ruin the time index due to the way we > > handle > > > out-of-order timestamps when using CreateTime. But the users who are > > using > > > LogAppendTime do not need to worry about this at all. > > > > > > The first odd thing is a valid concern. In your case, because you have > > the > > > timestamp in the message value, it is probably fine to just use > > > LogAppendTime on the broker, so the timestamp will only be used to > > provide > > > accurate log retention and log rolling based on when the message was > > > produced to the broker regardless when the message was created. This > > should > > > provide the exact same behavior on the broker side as before. > (Apologies > > > for the stale WIKI statement on the lines you quoted, as Jun said, the > > log > > > segment rolling is based on the timestamp of the first message instead > of > > > the largest timestamp in the log segment. I sent a change notification > to > > > the mailing list but forgot to update the wiki page. I just updated the > > > wiki page.) > > > > > > The second odd thing, as Jun mentioned, by design we do not keep a > global > > > timestamp order. During the search, we start from the oldest segment > and > > > scan over the segment until we find the first segment that contains a > > > timestamp which is larger than the target timestamp. This should > > guarantee > > > no message with larger timestamp will be missed. For example if we > have 3 > > > segments whose largest timestamps are 100 300 200, and we are looking > for > > > timestamp 250, we will start to scan at first segment and stop at the > > > second segment and search inside that segment for the first timestmap > > > greater or equals to 250. So reordered largest timestamp across > segments > > > should not be an issue. > > > > > > The third odd thing is a good point. There are a few reasons we chose > to > > > store the offsets instead physical position in the time index. Easier > > > truncation is one of the reasons but this may not be a big issue. > Another > > > reason is that in the early implementation, the time index and offset > > index > > > are actually aligned, i.e. each offset in the time index as a > > corresponding > > > entry in the offset index ( the reverse is not true). So the physical > > > position is already stored in the offset index. Later on we switched to > > the > > > current implementation, which has the time index pointing to the exact > > > shallow message in the log segment. With this implementation, if the > > > message with the largest timestamp appears in the middle of an > > uncompressed > > > message set, we may need to calculate the physical position for that > > > message. This is doable but could potentially be an overhead for each > > > append and adding some complexity. Given that OffsetRequest is supposed > > to > > > be a pretty infrequent request, it is probably OK to do the secondary > > > lookup but save the work on each append. > > > > > > Jun has already mentioned a few use cases for searching by timestamp. > At > > > LinkedIn we also have several such use cases where people want to > rewind > > > the offsets to a certain time and reprocess the streams. > > > > > > @Jun, currently we are using CreateTime as the default value for > > > log.message.timestamp.type. I am wondering would it be less surprising > if > > > we change the default value to LogAppendTime so that the previous > > behavior > > > is maintained, because for users it would be bad if upgrading cause > their > > > message got deleted due the change of the behavior. What do you think? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > On Thu, Aug 25, 2016 at 2:36 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Jan, > > > > > > > > Thanks a lot for the feedback. Now I understood your concern better. > > The > > > > following are my comments. > > > > > > > > The first odd thing that you pointed out could be a real concern. > > > > Basically, if a producer publishes messages with really old > timestamp, > > > our > > > > default log.roll.hours (7 days) will indeed cause the broker to roll > a > > > log > > > > on ever message, which would be bad. Time-based rolling is actually > > used > > > > infrequently. The only use case that I am aware of is that for > > compacted > > > > topics, rolling logs based on time could allow the compaction to > happen > > > > sooner (since the active segment is never cleaned). One option is to > > > change > > > > the default log.roll.hours to infinite and also document the impact > on > > > > changing log.roll.hours. Jiangjie, what do you think? > > > > > > > > For the second odd thing, the OffsetRequest is a legacy request. It's > > > > awkward to use and we plan to deprecate it over time. That's why we > > > haven't > > > > change the logic in serving OffsetRequest after KIP-33. The plan is > to > > > > introduce a new OffsetRequest that will be exploiting the time based > > > index. > > > > It's possible to have log segments with non-increasing largest > > timestamp. > > > > As you can see in Log.fetchOffsetsByTimestamp(), we simply iterate > the > > > > segments in offset order and stop when we see the target timestamp. > > > > > > > > For the third odd thing, one of the original reasons why the > time-based > > > > index points to an offset instead of the file position is that it > makes > > > > truncating the time index to an offset easier since the offset is in > > the > > > > index. Looking at the code, we could also store the file position in > > the > > > > time index and do truncation based on position, instead of offset. It > > > > probably has a slight advantage of consistency between the two > indexes > > > and > > > > avoiding another level of indirection when looking up the time index. > > > > Jiangjie, have we ever considered that? > > > > > > > > The idea of log.message.timestamp.difference.max.ms is to prevent > the > > > > timestamp in the published messages to drift too far away from the > > > current > > > > timestamp. The default value is infinite though. > > > > > > > > Lastly, for the usefulness of time-based index, it's actually a > feature > > > > that the community wanted and voted for, not just for Confluent > > > customers. > > > > For example, being able to seek to an offset based on timestamp has > > been > > > a > > > > frequently asked feature. This can be useful for at least the > following > > > > scenarios: (1) If there is a bug in a consumer application, the user > > will > > > > want to rewind the consumption after fixing the logic. In this case, > > it's > > > > more convenient to rewind the consumption based on a timestamp. (2) > In > > a > > > > multi data center setup, it's common for people to mirror the data > from > > > one > > > > Kafka cluster in one data center to another cluster in a different > data > > > > center. If one data center fails, people want to be able to resume > the > > > > consumption in the other data center. Since the offsets are not > > > preserving > > > > between the two clusters through mirroring, being able to find a > > starting > > > > offset based on timestamp will allow the consumer to resume the > > > consumption > > > > without missing any messages and also not replaying too many > messages. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Wed, Aug 24, 2016 at 5:05 PM, Jan Filipiak < > > jan.filip...@trivago.com> > > > > wrote: > > > > > > > > > Hey Jun, > > > > > > > > > > I go and try again :), wrote the first one in quite a stressful > > > > > environment. The bottom line is that I, for our use cases, see a to > > > small > > > > > use/effort ratio in this time index. > > > > > We do not bootstrap new consumers for key-less logs so frequently > and > > > > when > > > > > we do it, they usually want everything (prod deployment) or just > > start > > > at > > > > > the end ( during development). > > > > > That caused quite some frustration. Would be better if I could just > > > have > > > > > turned it off and don't bother any more. Anyhow in the meantime I > had > > > to > > > > > dig deeper into the inner workings > > > > > and the impacts are not as dramatic as I initially assumed. But it > > > still > > > > > carries along some oddities I want to list here. > > > > > > > > > > first odd thing: > > > > > Quote > > > > > --- > > > > > Enforce time based log rolling > > > > > > > > > > Currently time based log rolling is based on the creating time of > the > > > log > > > > > segment. With this KIP, the time based rolling would be changed to > > > based > > > > on > > > > > the largest timestamp ever seen in a log segment. A new log segment > > > will > > > > be > > > > > rolled out if current time is greater than largest timestamp ever > > seen > > > in > > > > > the log segment + log.roll.ms. When message.timestamp.type= > > CreateTime, > > > > > user should set max.message.time.difference.ms appropriately > > together > > > > > with log.roll.ms to avoid frequent log segment roll out. > > > > > --- > > > > > imagine a Mirrormaker falls behind and the Mirrormaker has a delay > of > > > > some > > > > > time > log.roll.ms. > > > > > From my understanding, when noone else is producing to this > partition > > > > > except the mirror maker, the broker will start rolling on every > > append? > > > > > Just because you maybe under DOS-attack and your application only > > works > > > > in > > > > > the remote location. (also a good occasion for MM to fall behind) > > > > > But checking the default values indicates that it should indeed not > > > > become > > > > > a problem as log.roll.ms defaults to ~>7 days. > > > > > > > > > > > > > > > second odd thing: > > > > > Quote > > > > > --- > > > > > A time index entry (*T*, *offset*) means that in this segment any > > > message > > > > > whose timestamp is greater than *T* come after *offset.* > > > > > > > > > > The OffsetRequest behaves almost the same as before. If timestamp > *T* > > > is > > > > > set in the OffsetRequest, the first offset in the returned offset > > > > sequence > > > > > means that if user want to consume from *T*, that is the offset to > > > start > > > > > with. The guarantee is that any message whose timestamp is greater > > > than T > > > > > has a bigger offset. i.e. Any message before this offset has a > > > timestamp > > > > < > > > > > *T*. > > > > > --- > > > > > > > > > > Given how the index is maintained, with a little bit of bad luck > > > (rolling > > > > > upgrade/config change of mirrormakers for different colocations) > one > > > ends > > > > > with segmentN.timeindex.maxtimestamp > segmentN+1.timeindex. > > > > maxtimestamp. > > > > > If I do not overlook something here, then the fetch code does not > > seem > > > to > > > > > take that into account. > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L604 > > > > > In this case the Goal listed number 1, not loose any messages, is > not > > > > > achieved. easy fix seems to be to sort the segsArray by > maxtimestamp > > > but > > > > > can't wrap my head around it just now. > > > > > > > > > > > > > > > third odd thing: > > > > > Regarding the worry of increasing complexity. Looking at the code > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L193 > -196 > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L227 & > 230 > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L265 > -266 > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L305 > -307 > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L408 - > 410 > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/LogSegment.scala#L432 - > 435 > > > > > https://github.com/apache/kafka/blob/ > 05d00b5aca2e1e59ad685a3f051d2a > > > > > b022f75acc/core/src/main/scala/kafka/log/LogSegment.scala#L104 > -108 > > > > > and especially > > > > > https://github.com/apache/kafka/blob/ > 79d3fd2bf0e5c89ff74a2988c40388 > > > > > 2ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L717 > > > > > it feels like the Log & Log segment having a detailed knowledge > about > > > the > > > > > maintained indexes is not the ideal way to model the problem. > > > > > Having the Server maintian a Set of Indexes could reduce the code > > > > > complexity, while also allowing an easy switch to turn it off. I > > think > > > > both > > > > > indexes could point to the physical position, a client would do > > > > > fetch(timestamp), and the continue with the offsets as usual. Is > > there > > > > any > > > > > specific reason the timestamp index points into the offset index? > > > > > For reading one would need to branch earlier, maybe already in > > > ApiHandler > > > > > and decide what indexes to query, but this branching logic is there > > now > > > > > anyhow. > > > > > > > > > > Further I also can't think of a situation where one wants to have > > this > > > > > log.message.timestamp.difference.max.ms take effect. I think this > > > > defeats > > > > > goal 1 again. > > > > > > > > > > ITE having this index in the brokers now feels wired to me. Gives > me > > a > > > > > feeling of complexity that I don't need and have a hard time > figuring > > > out > > > > > how much other people can benefit from it. I hope that this > feedback > > is > > > > > useful and helps to understand my scepticism regarding this thing. > > > There > > > > > were some other oddities that I have a hard time recalling now. So > i > > > > guess > > > > > the index was build for a specific confluent customer, will there > be > > > any > > > > > blogpost about their usecase? or can you share it? > > > > > > > > > > Best Jan > > > > > > > > > > > > > > > On 24.08.2016 16:47, Jun Rao wrote: > > > > > > > > > > Jan, > > > > > > > > > > Thanks for the reply. I actually wasn't sure what your main concern > > on > > > > > time-based rolling is. Just a couple of clarifications. (1) > > Time-based > > > > > rolling doesn't control how long a segment will be retained for. > For > > > > > retention, if you use time-based, it will now be based on the > > timestamp > > > > in > > > > > the message. If you use size-based, it works the same as before. Is > > > your > > > > > concern on time-based retention? If so, you can always configure > the > > > > > timestamp in all topics to be log append time, which will give you > > the > > > > same > > > > > behavior as before. (2) The creation time of the segment is never > > > exposed > > > > > to the consumer and therefore is never preserved in MirrorMaker. In > > > > > contrast, the timestamp in the message will be preserved in > > > MirrorMaker. > > > > > So, not sure what your concern on MirrorMaker is. > > > > > > > > > > Jun > > > > > > > > > > On Wed, Aug 24, 2016 at 5:03 AM, Jan Filipiak < > > > jan.filip...@trivago.com> > > > > > wrote: > > > > > > > > > >> Hi Jun, > > > > >> > > > > >> I copy pasted this mail from the archive, as I somehow didn't > > receive > > > it > > > > >> per mail. I will sill make some comments in line, > > > > >> hopefully you can find them quick enough, my apologies. > > > > >> > > > > >> To make things more clear, you should also know, that all messages > > in > > > > our > > > > >> kafka setup have a common way to access their timestamp already > (its > > > > >> encoded in the value the same way always) > > > > >> Sometimes this is a logical time (eg same timestamp accross many > > > > >> different topics / partitions), say PHP request start time or the > > > like. > > > > So > > > > >> kafkas internal timestamps are not really attractive > > > > >> for us anyways currently. > > > > >> > > > > >> I hope I can make a point and not waste your time. > > > > >> > > > > >> Best Jan, > > > > >> > > > > >> hopefully everything makes sense > > > > >> > > > > >> -------- > > > > >> > > > > >> Jan, > > > > >> > > > > >> Currently, there is no switch to disable the time based index. > > > > >> > > > > >> There are quite a few use cases of time based index. > > > > >> > > > > >> 1. From KIP-33's wiki, it allows us to do time-based retention > > > > accurately. > > > > >> Before KIP-33, the time-based retention is based on the last > > modified > > > > time > > > > >> of each log segment. The main issue is that last modified time can > > > > change > > > > >> over time. For example, if a broker loses storage and has to > > > > re-replicate > > > > >> all data, those re-replicated segments will be retained much > longer > > > > since > > > > >> their last modified time is more recent. Having a time-based index > > > > allows > > > > >> us to retain segments based on the message time, not the last > > modified > > > > >> time. This can also benefit KIP-71, where we want to combine > > > time-based > > > > >> retention and compaction. > > > > >> > > > > >> /If your sparse on discspace, one could try to get by that with > > > > >> retention.bytes/ > > > > >> or, as we did, ssh into the box and rm it, which worked quite good > > > when > > > > >> no one reads it. > > > > >> Chuckles a little when its read but readers usually do an > > > > >> auto.offset.reset > > > > >> (they are to slow any ways if they reading the last segments > hrhr). > > > > >> > > > > >> 2. In KIP-58, we want to delay log compaction based on a > > configurable > > > > >> amount of time. Time-based index allows us to do this more > > accurately. > > > > >> > > > > >> /good point, seems reasonable/ > > > > >> > > > > >> 3. We plan to add an api in the consumer to allow seeking to an > > offset > > > > >> based on a timestamp. The time based index allows us to do this > more > > > > >> accurately and fast. > > > > >> > > > > >> /Sure, I personally feel that you rarely want to do this. For > Camus, > > > we > > > > >> used max.pull.historic.days (or simmilliar) successfully quite > > often. > > > we > > > > >> just gave it an extra day and got what we wanted > > > > >> and for debugging my bisect tool works well enough. So these are > > the 2 > > > > >> usecases we expierenced already and found a decent way around it./ > > > > >> > > > > >> Now for the impact. > > > > >> > > > > >> a. There is a slight change on how time-based rolling works. > Before > > > > >> KIP-33, > > > > >> rolling was based on the time when a segment was loaded in the > > broker. > > > > >> After KIP-33, rolling is based on the time of the first message > of a > > > > >> segment. Not sure if this is your concern. In the common case, the > > two > > > > >> behave more or less the same. The latter is actually more > > > deterministic > > > > >> since it's not sensitive to broker restarts. > > > > >> > > > > >> /This is part of my main concern indeed. This is what scares me > and > > I > > > > >> preffered to just opt out, instead of reviewing all our pipelines > to > > > > check > > > > >> whats gonna happen when we put it live. > > > > >> For Example the Mirrormakers, If they want to preserve create time > > > from > > > > >> the source cluster and publish the same create time (wich they > > should > > > > do, > > > > >> if you don't encode your own timestamps and want > > > > >> to have proper kafka-streams windowing). Then I am quite concerned > > > when > > > > >> have problems if our cross ocian links and fall behind, say a day > or > > > > two. > > > > >> Then I can think of an very up to date MirrorMaker from > > > > >> one colocation and a very laggy Mirrormaker from another > colocation. > > > For > > > > >> me its not 100% clear whats gonna happen. But I can't think of > sane > > > > >> defaults there. That i love kafka for. > > > > >> Just tricky to be convinced that an upgrade is safe, wich was > > usually > > > > >> easy. > > > > >> / > > > > >> b. Time-based index potentially adds overhead to producing > messages > > > and > > > > >> loading segments. Our experiments show that the impact to > producing > > is > > > > >> insignificant. The time to load segments when restarting a broker > > can > > > be > > > > >> doubled. However, the absolute time is still reasonable. For > > example, > > > > >> loading 10K log segments with time-based index takes about 5 > > seconds. > > > > >> / > > > > >> //Loading should be fine/, totally agree > > > > >> > > > > >> c Because time-based index is useful in several cases and the > impact > > > > seems > > > > >> small, we didn't consider making time based index optional. > Finally, > > > > >> although it's possible to make the time based index optional, it > > will > > > > add > > > > >> more complexity to the code base. So, we probably should only > > consider > > > > it > > > > >> if it's truly needed. Thanks, > > > > >> > > > > >> /I think one can get away with an easier codebase here. The trick > is > > > not > > > > >> to have the LOG to implement all the logic, > > > > >> but just have the broker maintain a Set of Indexes, that gets > > > > initialized > > > > >> in starup and passed to the LOG. One could ask each individual > > > > >> index, if that logsegment should be rolled, compacted, truncated > > > > >> whatever. Once could also give that LogSegment to each index and > > make > > > > it > > > > >> rebuild > > > > >> the index for example. I didn't figure out the details. But this > > > > >> https://github.com/apache/kafka/blob/79d3fd2bf0e5c89ff74a298 > > > > >> 8c403882ae8a9852e/core/src/main/scala/kafka/log/Log.scala#L715 > > > > >> might end up with for(Index i : indexes) [i.shouldRoll(segment)}? > > wich > > > > >> should already be easier. > > > > >> If users don't want time based indexing, just don't put the > > timebased > > > > >> index in the Set then and everything should work like a charm. > > > > >> RPC calls that work on the specific indexes would need to throw an > > > > >> exception of some kind. > > > > >> Just an idea. > > > > >> / > > > > >> Jun > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> On 22.08.2016 09:24, Jan Filipiak wrote: > > > > >> > > > > >>> Hello everyone, > > > > >>> > > > > >>> I stumbled across KIP-33 and the time based index, while briefly > > > > >>> checking the wiki and commits, I fail to find a way to opt out. > > > > >>> I saw it having quite some impact on when logs are rolled and was > > > > hoping > > > > >>> not to have to deal with all of that. Is there a disable switch I > > > > >>> overlooked? > > > > >>> > > > > >>> Does anybody have a good use case where the timebase index comes > in > > > > >>> handy? I made a custom console consumer for me, > > > > >>> that can bisect a log based on time. Its just a quick > probabilistic > > > > shot > > > > >>> into the log but is sometimes quite useful for some debugging. > > > > >>> > > > > >>> Best Jan > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > >