Luis, Just sharing my two cents here: we can extend the OffsetMap trait, making its "offset" type a generic template, and have a integer typed OffsetMap (for offset), and a long typed OffsetMap (for timestamp), and a byte array typed OffsetMap (record header fields).
For the byte array typed offset map, we can use a general hashmap, where the hashmap's CAPACITY will be reasoned from the given "val memory: Int" parameter, for example: http://java-performance.info/memory-consumption-of-java-data-types-2/ Guozhang On Tue, Apr 24, 2018 at 1:40 PM, Luis Cabral <luis_cab...@yahoo.com.invalid> wrote: > Hi Guozhang, > > I mean here: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/ > OffsetMap.scala > > > It seems that this class was designed with a great focus on preventing > memory starvation, and this class is also very central to the changes > required for this KIP. > > So far this has not been of great concern, as enhancing that logic with an > extra 8 bytes is quite easy (have a look at the pull request for more > details). > Is it alright to enhance this with a variable amount of bytes, though? Or > to replace it with a direct record cache? > > Kind Regards > Luis > > > > On 24 Apr 2018, at 20:30, Guozhang Wang <wangg...@gmail.com> wrote: > > > > Not sure if I fully understand your question, but here's my > understanding: > > In LogCleaner we call: > > > > "val records = MemoryRecords.readableRecords(readBuffer)" > > > > Which returns a MemoryRecords object, and then call filterInto with a > given > > customized RecordFilter that instantiates "checkBatchRetention(batch: > > RecordBatch)". Note that `RecordBatch` is just an iterator of `Record`, > > which contains the headers, so we can just access that header there. > > > > > > Guozhang > > > > > > On Tue, Apr 24, 2018 at 12:41 AM, Luís Cabral > <luis_cab...@yahoo.com.invalid > >> wrote: > > > >> > >> Hi Guozhang, > >> > >> As much as I would like to move on from this topic, I've now tried to > >> implement it into the pull request, and could not find a viable way to > >> store a variable size byte array into the current concept of the log > >> cleaner (with long the current approach just always considers it to be 8 > >> bytes). > >> > >> Do you have any suggestions on how to handle this issue there? > >> > >> Kind Regards, > >> Luis > >> > >> On Tuesday, April 24, 2018, 1:11:11 AM GMT+2, Luís Cabral < > >> luis_cab...@yahoo.com> wrote: > >> > >> #yiv6853119978 #yiv6853119978 -- _filtered #yiv6853119978 {panose-1:2 4 > 5 > >> 3 5 4 6 3 2 4;} _filtered #yiv6853119978 {font-family:Calibri;panose-1: > 2 > >> 15 5 2 2 2 4 3 2 4;}#yiv6853119978 #yiv6853119978 > p.yiv6853119978MsoNormal, > >> #yiv6853119978 li.yiv6853119978MsoNormal, #yiv6853119978 > >> div.yiv6853119978MsoNormal {margin:0cm;margin-bottom:. > >> 0001pt;font-size:11.0pt;font-family:sans-serif;}#yiv6853119978 a:link, > >> #yiv6853119978 span.yiv6853119978MsoHyperlink > {color:blue;text-decoration:underline;}#yiv6853119978 > >> a:visited, #yiv6853119978 span.yiv6853119978MsoHyperlinkFollowed > >> {color:#954F72;text-decoration:underline;}#yiv6853119978 > >> .yiv6853119978MsoChpDefault {} _filtered #yiv6853119978 {margin:72.0pt > >> 72.0pt 72.0pt 72.0pt;}#yiv6853119978 div.yiv6853119978WordSection1 > >> {}#yiv6853119978 > >> That is definitely clearer, KIP updated! > >> > >> > >> > >> From: Guozhang Wang > >> Sent: 23 April 2018 23:44 > >> To: dev@kafka.apache.org > >> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction > >> > >> > >> > >> Thanks Luís. The KIP looks good to me. Just that what I left as a minor: > >> > >> > >> > >> `When both records being compared contain a matching "compaction value", > >> > >> then the record with the highest offset will be kept;` > >> > >> > >> > >> I understand your intent, it's just that the sentence itself is a bit > >> > >> misleading, I think what you actually meant to say: > >> > >> > >> > >> `When both records being compared contain a matching "compaction value" > and > >> > >> their corresponding byte arrays are considered equal, then the record > with > >> > >> the highest offset will be kept;` > >> > >> > >> > >> > >> > >> > >> > >> Guozhang > >> > >> > >> > >> > >> > >> > >> > >> On Mon, Apr 23, 2018 at 1:54 PM, Luís Cabral > <luis_cab...@yahoo.com.invalid > >>> > >> > >> wrote: > >> > >> > >> > >>> Hello Guozhang, > >> > >>> > >> > >>> The KIP is now updated to reflect this choice in strategy. > >> > >>> Please let me know your thoughts there. > >> > >>> > >> > >>> Kind Regards, > >> > >>> Luís > >> > >>> > >> > >>> From: Guozhang Wang > >> > >>> Sent: 23 April 2018 19:32 > >> > >>> To: dev@kafka.apache.org > >> > >>> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction > >> > >>> > >> > >>> Hi Luis, > >> > >>> > >> > >>> I think by "generalizing it" we could go beyond numerical values, and > >> > >>> that's why I suggested we do not need to require that the type > serialized > >> > >>> to the bytes have any numerical semantics since it has to ben > serialized > >> to > >> > >>> a byte array anyways. I understand that for your use case, the intended > >> > >>> record header compaction value is a number, but imagine if someone else > >> > >>> wants to compact the same-keyed messages based on some record header > >> > >>> key-value pair whose value types before serializing to bytes are not > >> > >>> numbers at all, but just some strings: > >> > >>> > >> > >>> key: "A", value: "a1", header: ["bar" -> "a".bytes()], > >> > >>> key: "A", value: "a2", header: ["bar" -> "c".bytes()], > >> > >>> key: "A", value: "a3", header: ["bar" -> "b".bytes()], > >> > >>> > >> > >>> > >> > >>> Could we allow them to use that header for compaction as well? > >> > >>> > >> > >>> > >> > >>> Now going back to your use case, for numbers that could be negative > >> values, > >> > >>> as long as users are aware of the requirement and change the default > >> > >>> encoding schemes when they generate the producer record while setting > the > >> > >>> headers so that the serialized bytes still obey the value that should > be > >> OK > >> > >>> (again, as I said, we push this responsibility to users to define the > >> right > >> > >>> serde mechanism, but that seems to be more flexible). For example: -INF > >> > >>> serialized to 0x00000000, -INF+1 serialized to 0x00000001, etc. > >> > >>> > >> > >>> > >> > >>> > >> > >>> Guozhang > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral > >> > >>> <luis_cab...@yahoo.com.invalid > >> > >>>> wrote: > >> > >>> > >> > >>>> Hello Guozhang, > >> > >>>> > >> > >>>> Thanks for the fast reply! > >> > >>>> > >> > >>>> As for the matter of the timestamp, it’s now added to the KIP, so I > >> hope > >> > >>>> this is correctly addressed. > >> > >>>> Kindly let me know if you would like some adaptions to the concept. > >> > >>>> > >> > >>>> > >> > >>>> bq. The issue that I do not understand completely is why you'd keep > >> > >>> saying > >> > >>>> that why we need to convert it to a String, first then converting to > >> any > >> > >>>> other fields. > >> > >>>> > >> > >>>> Maybe I’m over-engineering it again, and the problem can be simplified > >> to > >> > >>>> restricting this to values greater than or equal to zero, which ends > up > >> > >>>> being ok for my own use case... > >> > >>>> This would then generally guarantee the lexicographic ordering, as you > >> > >>> say. > >> > >>>> Is this what you mean? Should I then add this restriction to the KIP? > >> > >>>> > >> > >>>> Cheers, > >> > >>>> Luis > >> > >>>> > >> > >>>> From: Guozhang Wang > >> > >>>> Sent: 23 April 2018 17:55 > >> > >>>> To: dev@kafka.apache.org > >> > >>>> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction > >> > >>>> > >> > >>>> Hello Luis, > >> > >>>> > >> > >>>> Thanks for your email, replying to your points in the following: > >> > >>>> > >> > >>>>> I don't personally see advantages in it, but also the only > >> disadvantage > >> > >>>> that I can think of is putting multiple meanings on this field. > >> > >>>> > >> > >>>> If we do not treat timestamp as a special value of the config, then I > >> > >>>> cannot use the timestamp field of the record as the compaction value, > >> > >>> since > >> > >>>> we will only look into the record header other than the default > offset, > >> > >>>> right? Then users wanting to use the timestamp as the compaction value > >> > >>> have > >> > >>>> to put that timestamp into the record header with a name, which > >> > >>> duplicates > >> > >>>> the field unnecessary. So to me without treating it as a special value > >> we > >> > >>>> are doomed to have duplicate record field. > >> > >>>> > >> > >>>>> Having it this way would jeopardize my own particular use case, as I > >> > >>> need > >> > >>>> to have an incremental number representing the version (i.e.: 1, 2, 3, > >> 5, > >> > >>>> 52, et cetera) > >> > >>>> > >> > >>>> The issue that I do not understand completely is why you'd keep saying > >> > >>> that > >> > >>>> why we need to convert it to a String, first then converting to any > >> other > >> > >>>> fields. Since the header is organized in: > >> > >>>> > >> > >>>> public interface Header { > >> > >>>> > >> > >>>> String key(); > >> > >>>> > >> > >>>> byte[] value(); > >> > >>>> > >> > >>>> } > >> > >>>> > >> > >>>> > >> > >>>> Which means that the header value can be of any types. So with your > use > >> > >>>> case why can't you just serialize your incremental version number into > >> a > >> > >>>> byte array directly, whose lexico-order obeys the version number > >> value?? > >> > >>> I > >> > >>>> think the default byte serialization mechanism of the integer is > >> > >>> sufficient > >> > >>>> for this purpose (assuming that increment number is int). > >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> Guozhang > >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral > >> > >>> <luis_cab...@yahoo.com.invalid > >> > >>>>> > >> > >>>> wrote: > >> > >>>> > >> > >>>>> Hi Guozhang, > >> > >>>>> > >> > >>>>> Thank you very much for the patience in explaining your points, I've > >> > >>>>> learnt quite a bit in researching and experimenting after your > >> replies. > >> > >>>>> > >> > >>>>> > >> > >>>>> bq. I still think it is worth defining `timestamp` as a special > >> > >>>> compaction > >> > >>>>> value > >> > >>>>> > >> > >>>>> I don't personally see advantages in it, but also the only > >> disadvantage > >> > >>>>> that I can think of is putting multiple meanings on this field, which > >> > >>>> does > >> > >>>>> not seem enough to dissuade anyone, so I've added it to the KIP as a > >> > >>>>> compromise. > >> > >>>>> (please also see the pull request in case you want to confirm the > >> > >>>>> implementation matches your idea) > >> > >>>>> > >> > >>>>> > >> > >>>>> bq. Should it be "the record with the highest value will be kept"? > >> > >>>>> > >> > >>>>> > >> > >>>>> That is describing a scenario where the records being compared have > >> the > >> > >>>>> same value, in which case the offset is used as a tie-breaker. > >> > >>>>> With trying to cover as much as possible, the "Proposed Changes" may > >> > >>> have > >> > >>>>> became confusing to read, sorry for that... > >> > >>>>> > >> > >>>>> > >> > >>>>> bq. Users are then responsible to encode their compaction field > >> > >>> according > >> > >>>>> to the byte array lexico-ordering to full fill their ordering > >> > >>> semantics. > >> > >>>> It > >> > >>>>> is more flexible to enforce users to encode their compaction field > >> > >>> always > >> > >>>>> as a long type. > >> > >>>>> > >> > >>>>> This was indeed my focus on the previous replies, since I am not sure > >> > >>> how > >> > >>>>> this would work without adding a lot of responsibility on the client > >> > >>>> side. > >> > >>>>> So, rather than trying to debate best practices, since I don't know > >> > >>> which > >> > >>>>> ones are being followed in this project, I will instead debate my own > >> > >>>>> selfish need for this feature: > >> > >>>>> Having it this way would jeopardize my own particular use case, as I > >> > >>> need > >> > >>>>> to have an incremental number representing the version (i.e.: 1, 2, > >> 3, > >> > >>> 5, > >> > >>>>> 52, et cetera). It does not totally invalidate it, since we can > >> always > >> > >>>>> convert it to String on the client side and left-pad with 0's to the > >> > >>> max > >> > >>>>> length of a long, but it seems a shame to have to do this as it would > >> > >>>>> increase the data transfer size (I'm trying to avoid it becoming a > >> > >>>>> bottleneck during high throughput periods). This would likely mean > >> > >>> that I > >> > >>>>> would start abusing the "timestamp" approach discussed above, as it > >> > >>> keeps > >> > >>>>> the messages nimble, but it would again be a shame to be forced into > >> > >>>> such a > >> > >>>>> hacky solution. > >> > >>>>> This is how I see it, and why I would like to avoid it. But maybe > >> there > >> > >>>> is > >> > >>>>> some smarter way that you know of on how to handle it on the client > >> > >>> side > >> > >>>>> that would invalidate these concerns? > >> > >>>>> Please let me know, and I would also greatly value some more feedback > >> > >>>> from > >> > >>>>> other people regarding this topic, so please don't be shy! > >> > >>>>> > >> > >>>>> Kind Regards,Luis On Friday, April 20, 2018, 7:41:30 PM GMT+2, > >> > >>>> Guozhang > >> > >>>>> Wang <wangg...@gmail.com> wrote: > >> > >>>>> > >> > >>>>> Hi Luís, > >> > >>>>> > >> > >>>>> What I'm thinking primarily is that we only need to compare the > >> > >>>> compaction > >> > >>>>> values as LONG for the offset and timestmap "type" (I still think it > >> is > >> > >>>>> worth defining `timestamp` as a special compaction value, with the > >> > >>>> reasons > >> > >>>>> below). > >> > >>>>> > >> > >>>>> Not sure if you've seen my other comment earlier regarding the > >> offset / > >> > >>>>> timestmap, I'm pasting / editing them here to illustrate my idea: > >> > >>>>> > >> > >>>>> -------------- > >> > >>>>> > >> > >>>>> I think maybe we have a mis-communication here: I'm not against the > >> > >>> idea > >> > >>>> of > >> > >>>>> using headers, but just trying to argue that we could make > >> `timestamp` > >> > >>>>> field a special config value that is referring to the timestamp field > >> > >>> in > >> > >>>>> the metadata. So from log cleaner's pov: > >> > >>>>> > >> > >>>>> 1. if the config value is "offset", look into the offset field, > >> > >>>> *comparing > >> > >>>>> their value as long* > >> > >>>>> 2. if the config value is "timestamp", look into the timestamp field, > >> > >>>>> *comparing > >> > >>>>> their value as long* > >> > >>>>> 3. otherwise, say the config value is "foo", search for key "foo" in > >> > >>> the > >> > >>>>> message header, comparing the value as *byte arrays* > >> > >>>>> > >> > >>>>> I.e. "offset" and "timestamp" are treated as special cases other than > >> > >>>> case > >> > >>>>> 3) above. > >> > >>>>> > >> > >>>>> -------------- > >> > >>>>> > >> > >>>>> I think your main concern is that "Although the byte[] can be > >> compared, > >> > >>>> it > >> > >>>>> is not actually comparable as the versioning is based on a long", > >> while > >> > >>>> I'm > >> > >>>>> thinking we can indeed generalize it: there is not hard reasons that > >> > >>> the > >> > >>>>> "compaction value" has to be a long, and since the goal of this KIP > >> is > >> > >>> to > >> > >>>>> generalize the log compaction logic to consider header fields, why > >> not > >> > >>>>> allowing it to be of any types than enforcing them still to be a long > >> > >>>> type? > >> > >>>>> Users are then responsible to encode their compaction field according > >> > >>> to > >> > >>>>> the byte array lexico-ordering to full fill their ordering semantics. > >> > >>> It > >> > >>>> is > >> > >>>>> more flexible to enforce users to encode their compaction field > >> always > >> > >>>> as a > >> > >>>>> long type. Let me know WDYT. > >> > >>>>> > >> > >>>>> > >> > >>>>> > >> > >>>>> Also I have some minor comments on the wiki itself: > >> > >>>>> > >> > >>>>> 1) "When both records being compared contain a matching "compaction > >> > >>>> value", > >> > >>>>> then the record with the highest offset will be kept;" > >> > >>>>> > >> > >>>>> Should it be "the record with the highest value will be kept"? > >> > >>>>> > >> > >>>>> > >> > >>>>> > >> > >>>>> > >> > >>>>> Guozhang > >> > >>>>> > >> > >>>>> > >> > >>>>> On Fri, Apr 20, 2018 at 1:05 AM, Luís Cabral > >> > >>>> <luis_cab...@yahoo.com.invalid > >> > >>>>>> > >> > >>>>> wrote: > >> > >>>>> > >> > >>>>>> Guozhang, is this reply ok with you? > >> > >>>>>> > >> > >>>>>> > >> > >>>>>> If you insist on the byte[] comparison directly, then I would need > >> > >>> some > >> > >>>>>> suggestions on how to represent a "version" with it, and then the > >> KIP > >> > >>>>> could > >> > >>>>>> be changed to that. > >> > >>>>>> On Tuesday, April 17, 2018, 2:44:16 PM GMT+2, Luís Cabral < > >> > >>>>>> luis_cab...@yahoo.com> wrote: > >> > >>>>>> > >> > >>>>>> Oops, missed that email... > >> > >>>>>> > >> > >>>>>> bq. It is because when we compare the bytes we do not treat them as > >> > >>>> longs > >> > >>>>>> atall, so we just compare them based on bytes; I admit that if > >> > >>> users's > >> > >>>>>> headertypes have some semantic meanings (e.g. it is encoded from a > >> > >>>> long) > >> > >>>>>> they weare forcing them to choose the encoder that obeys key > >> > >>>>>> lexicographicordering; but I felt it is more general than enforcing > >> > >>> any > >> > >>>>>> fields that maybe used for log cleaner to be defined as a special > >> > >>> type. > >> > >>>>>> > >> > >>>>>> Yes, you can compare bytes between each other (its what that code > >> > >>>> does). > >> > >>>>>> You can then assume (or infer) that the encoding used allows for > >> > >>>>>> lexicographic ordering, which I hope you do not do a lot of. This > >> is > >> > >>>>>> (logically) the same as converting to String and then comparing the > >> > >>>>>> strings, except that it allows for abstracting from the String > >> > >>> encoding > >> > >>>>>> (again, either with assumptions or with inferred knowledge). > >> > >>>>>> This is purely academic, however, as the versioning is based on a > >> > >>> long, > >> > >>>>>> which is not compatible with this approach. So, is this comment a > >> > >>>>>> fact-check stating that it is possible to compare byte[] overall, > >> or > >> > >>> is > >> > >>>>> it > >> > >>>>>> about trying to use it in this KIP? > >> > >>>>>> > >> > >>>>>> Cheers > >> > >>>>>> > >> > >>>>>> PS (because I'm stubborn): It is still not comparable, this > >> > >>> comparison > >> > >>>> is > >> > >>>>>> all based on assumptions about the content of the byte array, but I > >> > >>>> hope > >> > >>>>> we > >> > >>>>>> can leave this stuff to Stack Overflow instead of debating it here > >> :) > >> > >>>>>> > >> > >>>>>> > >> > >>>>> > >> > >>>>> > >> > >>>>> > >> > >>>>> -- > >> > >>>>> -- Guozhang > >> > >>>>> > >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> -- > >> > >>>> -- Guozhang > >> > >>>> > >> > >>>> > >> > >>> > >> > >>> > >> > >>> -- > >> > >>> -- Guozhang > >> > >>> > >> > >>> > >> > >> > >> > >> > >> > >> -- > >> > >> -- Guozhang > >> > >> > >> > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang