Not sure if I fully understand your question, but here's my understanding: In LogCleaner we call:
"val records = MemoryRecords.readableRecords(readBuffer)" Which returns a MemoryRecords object, and then call filterInto with a given customized RecordFilter that instantiates "checkBatchRetention(batch: RecordBatch)". Note that `RecordBatch` is just an iterator of `Record`, which contains the headers, so we can just access that header there. Guozhang On Tue, Apr 24, 2018 at 12:41 AM, Luís Cabral <luis_cab...@yahoo.com.invalid > wrote: > > Hi Guozhang, > > As much as I would like to move on from this topic, I've now tried to > implement it into the pull request, and could not find a viable way to > store a variable size byte array into the current concept of the log > cleaner (with long the current approach just always considers it to be 8 > bytes). > > Do you have any suggestions on how to handle this issue there? > > Kind Regards, > Luis > > On Tuesday, April 24, 2018, 1:11:11 AM GMT+2, Luís Cabral < > luis_cab...@yahoo.com> wrote: > > #yiv6853119978 #yiv6853119978 -- _filtered #yiv6853119978 {panose-1:2 4 5 > 3 5 4 6 3 2 4;} _filtered #yiv6853119978 {font-family:Calibri;panose-1:2 > 15 5 2 2 2 4 3 2 4;}#yiv6853119978 #yiv6853119978 p.yiv6853119978MsoNormal, > #yiv6853119978 li.yiv6853119978MsoNormal, #yiv6853119978 > div.yiv6853119978MsoNormal {margin:0cm;margin-bottom:. > 0001pt;font-size:11.0pt;font-family:sans-serif;}#yiv6853119978 a:link, > #yiv6853119978 span.yiv6853119978MsoHyperlink > {color:blue;text-decoration:underline;}#yiv6853119978 > a:visited, #yiv6853119978 span.yiv6853119978MsoHyperlinkFollowed > {color:#954F72;text-decoration:underline;}#yiv6853119978 > .yiv6853119978MsoChpDefault {} _filtered #yiv6853119978 {margin:72.0pt > 72.0pt 72.0pt 72.0pt;}#yiv6853119978 div.yiv6853119978WordSection1 > {}#yiv6853119978 > That is definitely clearer, KIP updated! > > > > From: Guozhang Wang > Sent: 23 April 2018 23:44 > To: dev@kafka.apache.org > Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction > > > > Thanks Luís. The KIP looks good to me. Just that what I left as a minor: > > > > `When both records being compared contain a matching "compaction value", > > then the record with the highest offset will be kept;` > > > > I understand your intent, it's just that the sentence itself is a bit > > misleading, I think what you actually meant to say: > > > > `When both records being compared contain a matching "compaction value" and > > their corresponding byte arrays are considered equal, then the record with > > the highest offset will be kept;` > > > > > > > > Guozhang > > > > > > > > On Mon, Apr 23, 2018 at 1:54 PM, Luís Cabral <luis_cab...@yahoo.com.invalid > > > > wrote: > > > > > Hello Guozhang, > > > > > > The KIP is now updated to reflect this choice in strategy. > > > Please let me know your thoughts there. > > > > > > Kind Regards, > > > Luís > > > > > > From: Guozhang Wang > > > Sent: 23 April 2018 19:32 > > > To: dev@kafka.apache.org > > > Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction > > > > > > Hi Luis, > > > > > > I think by "generalizing it" we could go beyond numerical values, and > > > that's why I suggested we do not need to require that the type serialized > > > to the bytes have any numerical semantics since it has to ben serialized > to > > > a byte array anyways. I understand that for your use case, the intended > > > record header compaction value is a number, but imagine if someone else > > > wants to compact the same-keyed messages based on some record header > > > key-value pair whose value types before serializing to bytes are not > > > numbers at all, but just some strings: > > > > > > key: "A", value: "a1", header: ["bar" -> "a".bytes()], > > > key: "A", value: "a2", header: ["bar" -> "c".bytes()], > > > key: "A", value: "a3", header: ["bar" -> "b".bytes()], > > > > > > > > > Could we allow them to use that header for compaction as well? > > > > > > > > > Now going back to your use case, for numbers that could be negative > values, > > > as long as users are aware of the requirement and change the default > > > encoding schemes when they generate the producer record while setting the > > > headers so that the serialized bytes still obey the value that should be > OK > > > (again, as I said, we push this responsibility to users to define the > right > > > serde mechanism, but that seems to be more flexible). For example: -INF > > > serialized to 0x00000000, -INF+1 serialized to 0x00000001, etc. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral > > > <luis_cab...@yahoo.com.invalid > > > > wrote: > > > > > > > Hello Guozhang, > > > > > > > > Thanks for the fast reply! > > > > > > > > As for the matter of the timestamp, it’s now added to the KIP, so I > hope > > > > this is correctly addressed. > > > > Kindly let me know if you would like some adaptions to the concept. > > > > > > > > > > > > bq. The issue that I do not understand completely is why you'd keep > > > saying > > > > that why we need to convert it to a String, first then converting to > any > > > > other fields. > > > > > > > > Maybe I’m over-engineering it again, and the problem can be simplified > to > > > > restricting this to values greater than or equal to zero, which ends up > > > > being ok for my own use case... > > > > This would then generally guarantee the lexicographic ordering, as you > > > say. > > > > Is this what you mean? Should I then add this restriction to the KIP? > > > > > > > > Cheers, > > > > Luis > > > > > > > > From: Guozhang Wang > > > > Sent: 23 April 2018 17:55 > > > > To: dev@kafka.apache.org > > > > Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction > > > > > > > > Hello Luis, > > > > > > > > Thanks for your email, replying to your points in the following: > > > > > > > > > I don't personally see advantages in it, but also the only > disadvantage > > > > that I can think of is putting multiple meanings on this field. > > > > > > > > If we do not treat timestamp as a special value of the config, then I > > > > cannot use the timestamp field of the record as the compaction value, > > > since > > > > we will only look into the record header other than the default offset, > > > > right? Then users wanting to use the timestamp as the compaction value > > > have > > > > to put that timestamp into the record header with a name, which > > > duplicates > > > > the field unnecessary. So to me without treating it as a special value > we > > > > are doomed to have duplicate record field. > > > > > > > > > Having it this way would jeopardize my own particular use case, as I > > > need > > > > to have an incremental number representing the version (i.e.: 1, 2, 3, > 5, > > > > 52, et cetera) > > > > > > > > The issue that I do not understand completely is why you'd keep saying > > > that > > > > why we need to convert it to a String, first then converting to any > other > > > > fields. Since the header is organized in: > > > > > > > > public interface Header { > > > > > > > > String key(); > > > > > > > > byte[] value(); > > > > > > > > } > > > > > > > > > > > > Which means that the header value can be of any types. So with your use > > > > case why can't you just serialize your incremental version number into > a > > > > byte array directly, whose lexico-order obeys the version number > value?? > > > I > > > > think the default byte serialization mechanism of the integer is > > > sufficient > > > > for this purpose (assuming that increment number is int). > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral > > > <luis_cab...@yahoo.com.invalid > > > > > > > > > wrote: > > > > > > > > > Hi Guozhang, > > > > > > > > > > Thank you very much for the patience in explaining your points, I've > > > > > learnt quite a bit in researching and experimenting after your > replies. > > > > > > > > > > > > > > > bq. I still think it is worth defining `timestamp` as a special > > > > compaction > > > > > value > > > > > > > > > > I don't personally see advantages in it, but also the only > disadvantage > > > > > that I can think of is putting multiple meanings on this field, which > > > > does > > > > > not seem enough to dissuade anyone, so I've added it to the KIP as a > > > > > compromise. > > > > > (please also see the pull request in case you want to confirm the > > > > > implementation matches your idea) > > > > > > > > > > > > > > > bq. Should it be "the record with the highest value will be kept"? > > > > > > > > > > > > > > > That is describing a scenario where the records being compared have > the > > > > > same value, in which case the offset is used as a tie-breaker. > > > > > With trying to cover as much as possible, the "Proposed Changes" may > > > have > > > > > became confusing to read, sorry for that... > > > > > > > > > > > > > > > bq. Users are then responsible to encode their compaction field > > > according > > > > > to the byte array lexico-ordering to full fill their ordering > > > semantics. > > > > It > > > > > is more flexible to enforce users to encode their compaction field > > > always > > > > > as a long type. > > > > > > > > > > This was indeed my focus on the previous replies, since I am not sure > > > how > > > > > this would work without adding a lot of responsibility on the client > > > > side. > > > > > So, rather than trying to debate best practices, since I don't know > > > which > > > > > ones are being followed in this project, I will instead debate my own > > > > > selfish need for this feature: > > > > > Having it this way would jeopardize my own particular use case, as I > > > need > > > > > to have an incremental number representing the version (i.e.: 1, 2, > 3, > > > 5, > > > > > 52, et cetera). It does not totally invalidate it, since we can > always > > > > > convert it to String on the client side and left-pad with 0's to the > > > max > > > > > length of a long, but it seems a shame to have to do this as it would > > > > > increase the data transfer size (I'm trying to avoid it becoming a > > > > > bottleneck during high throughput periods). This would likely mean > > > that I > > > > > would start abusing the "timestamp" approach discussed above, as it > > > keeps > > > > > the messages nimble, but it would again be a shame to be forced into > > > > such a > > > > > hacky solution. > > > > > This is how I see it, and why I would like to avoid it. But maybe > there > > > > is > > > > > some smarter way that you know of on how to handle it on the client > > > side > > > > > that would invalidate these concerns? > > > > > Please let me know, and I would also greatly value some more feedback > > > > from > > > > > other people regarding this topic, so please don't be shy! > > > > > > > > > > Kind Regards,Luis On Friday, April 20, 2018, 7:41:30 PM GMT+2, > > > > Guozhang > > > > > Wang <wangg...@gmail.com> wrote: > > > > > > > > > > Hi Luís, > > > > > > > > > > What I'm thinking primarily is that we only need to compare the > > > > compaction > > > > > values as LONG for the offset and timestmap "type" (I still think it > is > > > > > worth defining `timestamp` as a special compaction value, with the > > > > reasons > > > > > below). > > > > > > > > > > Not sure if you've seen my other comment earlier regarding the > offset / > > > > > timestmap, I'm pasting / editing them here to illustrate my idea: > > > > > > > > > > -------------- > > > > > > > > > > I think maybe we have a mis-communication here: I'm not against the > > > idea > > > > of > > > > > using headers, but just trying to argue that we could make > `timestamp` > > > > > field a special config value that is referring to the timestamp field > > > in > > > > > the metadata. So from log cleaner's pov: > > > > > > > > > > 1. if the config value is "offset", look into the offset field, > > > > *comparing > > > > > their value as long* > > > > > 2. if the config value is "timestamp", look into the timestamp field, > > > > > *comparing > > > > > their value as long* > > > > > 3. otherwise, say the config value is "foo", search for key "foo" in > > > the > > > > > message header, comparing the value as *byte arrays* > > > > > > > > > > I.e. "offset" and "timestamp" are treated as special cases other than > > > > case > > > > > 3) above. > > > > > > > > > > -------------- > > > > > > > > > > I think your main concern is that "Although the byte[] can be > compared, > > > > it > > > > > is not actually comparable as the versioning is based on a long", > while > > > > I'm > > > > > thinking we can indeed generalize it: there is not hard reasons that > > > the > > > > > "compaction value" has to be a long, and since the goal of this KIP > is > > > to > > > > > generalize the log compaction logic to consider header fields, why > not > > > > > allowing it to be of any types than enforcing them still to be a long > > > > type? > > > > > Users are then responsible to encode their compaction field according > > > to > > > > > the byte array lexico-ordering to full fill their ordering semantics. > > > It > > > > is > > > > > more flexible to enforce users to encode their compaction field > always > > > > as a > > > > > long type. Let me know WDYT. > > > > > > > > > > > > > > > > > > > > Also I have some minor comments on the wiki itself: > > > > > > > > > > 1) "When both records being compared contain a matching "compaction > > > > value", > > > > > then the record with the highest offset will be kept;" > > > > > > > > > > Should it be "the record with the highest value will be kept"? > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Fri, Apr 20, 2018 at 1:05 AM, Luís Cabral > > > > <luis_cab...@yahoo.com.invalid > > > > > > > > > > > wrote: > > > > > > > > > > > Guozhang, is this reply ok with you? > > > > > > > > > > > > > > > > > > If you insist on the byte[] comparison directly, then I would need > > > some > > > > > > suggestions on how to represent a "version" with it, and then the > KIP > > > > > could > > > > > > be changed to that. > > > > > > On Tuesday, April 17, 2018, 2:44:16 PM GMT+2, Luís Cabral < > > > > > > luis_cab...@yahoo.com> wrote: > > > > > > > > > > > > Oops, missed that email... > > > > > > > > > > > > bq. It is because when we compare the bytes we do not treat them as > > > > longs > > > > > > atall, so we just compare them based on bytes; I admit that if > > > users's > > > > > > headertypes have some semantic meanings (e.g. it is encoded from a > > > > long) > > > > > > they weare forcing them to choose the encoder that obeys key > > > > > > lexicographicordering; but I felt it is more general than enforcing > > > any > > > > > > fields that maybe used for log cleaner to be defined as a special > > > type. > > > > > > > > > > > > Yes, you can compare bytes between each other (its what that code > > > > does). > > > > > > You can then assume (or infer) that the encoding used allows for > > > > > > lexicographic ordering, which I hope you do not do a lot of. This > is > > > > > > (logically) the same as converting to String and then comparing the > > > > > > strings, except that it allows for abstracting from the String > > > encoding > > > > > > (again, either with assumptions or with inferred knowledge). > > > > > > This is purely academic, however, as the versioning is based on a > > > long, > > > > > > which is not compatible with this approach. So, is this comment a > > > > > > fact-check stating that it is possible to compare byte[] overall, > or > > > is > > > > > it > > > > > > about trying to use it in this KIP? > > > > > > > > > > > > Cheers > > > > > > > > > > > > PS (because I'm stubborn): It is still not comparable, this > > > comparison > > > > is > > > > > > all based on assumptions about the content of the byte array, but I > > > > hope > > > > > we > > > > > > can leave this stuff to Stack Overflow instead of debating it here > :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > > -- > > -- Guozhang > > > > -- -- Guozhang