Not sure if I fully understand your question, but here's my understanding:
In LogCleaner we call:

"val records = MemoryRecords.readableRecords(readBuffer)"

Which returns a MemoryRecords object, and then call filterInto with a given
customized RecordFilter that instantiates "checkBatchRetention(batch:
RecordBatch)". Note that `RecordBatch` is just an iterator of `Record`,
which contains the headers, so we can just access that header there.


Guozhang


On Tue, Apr 24, 2018 at 12:41 AM, Luís Cabral <luis_cab...@yahoo.com.invalid
> wrote:

>
> Hi Guozhang,
>
> As much as I would like to move on from this topic, I've now tried to
> implement it into the pull request, and could not find a viable way to
> store a variable size byte array into the current concept of the log
> cleaner (with long the current approach just always considers it to be 8
> bytes).
>
> Do you have any suggestions on how to handle this issue there?
>
> Kind Regards,
> Luis
>
>     On Tuesday, April 24, 2018, 1:11:11 AM GMT+2, Luís Cabral <
> luis_cab...@yahoo.com> wrote:
>
>  #yiv6853119978 #yiv6853119978 -- _filtered #yiv6853119978 {panose-1:2 4 5
> 3 5 4 6 3 2 4;} _filtered #yiv6853119978 {font-family:Calibri;panose-1:2
> 15 5 2 2 2 4 3 2 4;}#yiv6853119978 #yiv6853119978 p.yiv6853119978MsoNormal,
> #yiv6853119978 li.yiv6853119978MsoNormal, #yiv6853119978
> div.yiv6853119978MsoNormal {margin:0cm;margin-bottom:.
> 0001pt;font-size:11.0pt;font-family:sans-serif;}#yiv6853119978 a:link,
> #yiv6853119978 span.yiv6853119978MsoHyperlink 
> {color:blue;text-decoration:underline;}#yiv6853119978
> a:visited, #yiv6853119978 span.yiv6853119978MsoHyperlinkFollowed
> {color:#954F72;text-decoration:underline;}#yiv6853119978
> .yiv6853119978MsoChpDefault {} _filtered #yiv6853119978 {margin:72.0pt
> 72.0pt 72.0pt 72.0pt;}#yiv6853119978 div.yiv6853119978WordSection1
> {}#yiv6853119978
> That is definitely clearer, KIP updated!
>
>
>
> From: Guozhang Wang
> Sent: 23 April 2018 23:44
> To: dev@kafka.apache.org
> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
>
>
> Thanks Luís. The KIP looks good to me. Just that what I left as a minor:
>
>
>
> `When both records being compared contain a matching "compaction value",
>
> then the record with the highest offset will be kept;`
>
>
>
> I understand your intent, it's just that the sentence itself is a bit
>
> misleading, I think what you actually meant to say:
>
>
>
> `When both records being compared contain a matching "compaction value" and
>
> their corresponding byte arrays are considered equal, then the record with
>
> the highest offset will be kept;`
>
>
>
>
>
>
>
> Guozhang
>
>
>
>
>
>
>
> On Mon, Apr 23, 2018 at 1:54 PM, Luís Cabral <luis_cab...@yahoo.com.invalid
> >
>
> wrote:
>
>
>
> > Hello Guozhang,
>
> >
>
> > The KIP is now updated to reflect this choice in strategy.
>
> > Please let me know your thoughts there.
>
> >
>
> > Kind Regards,
>
> > Luís
>
> >
>
> > From: Guozhang Wang
>
> > Sent: 23 April 2018 19:32
>
> > To: dev@kafka.apache.org
>
> > Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> >
>
> > Hi Luis,
>
> >
>
> > I think by "generalizing it" we could go beyond numerical values, and
>
> > that's why I suggested we do not need to require that the type serialized
>
> > to the bytes have any numerical semantics since it has to ben serialized
> to
>
> > a byte array anyways. I understand that for your use case, the intended
>
> > record header compaction value is a number, but imagine if someone else
>
> > wants to compact the same-keyed messages based on some record header
>
> > key-value pair whose value types before serializing to bytes are not
>
> > numbers at all, but just some strings:
>
> >
>
> > key: "A", value: "a1", header: ["bar" -> "a".bytes()],
>
> > key: "A", value: "a2", header: ["bar" -> "c".bytes()],
>
> > key: "A", value: "a3", header: ["bar" -> "b".bytes()],
>
> >
>
> >
>
> > Could we allow them to use that header for compaction as well?
>
> >
>
> >
>
> > Now going back to your use case, for numbers that could be negative
> values,
>
> > as long as users are aware of the requirement and change the default
>
> > encoding schemes when they generate the producer record while setting the
>
> > headers so that the serialized bytes still obey the value that should be
> OK
>
> > (again, as I said, we push this responsibility to users to define the
> right
>
> > serde mechanism, but that seems to be more flexible). For example: -INF
>
> > serialized to 0x00000000, -INF+1 serialized to 0x00000001, etc.
>
> >
>
> >
>
> >
>
> > Guozhang
>
> >
>
> >
>
> >
>
> >
>
> >
>
> > On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral
>
> > <luis_cab...@yahoo.com.invalid
>
> > > wrote:
>
> >
>
> > > Hello Guozhang,
>
> > >
>
> > > Thanks for the fast reply!
>
> > >
>
> > > As for the matter of the timestamp, it’s now added to the KIP, so I
> hope
>
> > > this is correctly addressed.
>
> > > Kindly let me know if you would like some adaptions to the concept.
>
> > >
>
> > >
>
> > > bq. The issue that I do not understand completely is why you'd keep
>
> > saying
>
> > > that why we need to convert it to a String, first then converting to
> any
>
> > > other fields.
>
> > >
>
> > > Maybe I’m over-engineering it again, and the problem can be simplified
> to
>
> > > restricting this to values greater than or equal to zero, which ends up
>
> > > being ok for my own use case...
>
> > > This would then generally guarantee the lexicographic ordering, as you
>
> > say.
>
> > > Is this what you mean? Should I then add this restriction to the KIP?
>
> > >
>
> > > Cheers,
>
> > > Luis
>
> > >
>
> > > From: Guozhang Wang
>
> > > Sent: 23 April 2018 17:55
>
> > > To: dev@kafka.apache.org
>
> > > Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> > >
>
> > > Hello Luis,
>
> > >
>
> > > Thanks for your email, replying to your points in the following:
>
> > >
>
> > > > I don't personally see advantages in it, but also the only
> disadvantage
>
> > > that I can think of is putting multiple meanings on this field.
>
> > >
>
> > > If we do not treat timestamp as a special value of the config, then I
>
> > > cannot use the timestamp field of the record as the compaction value,
>
> > since
>
> > > we will only look into the record header other than the default offset,
>
> > > right? Then users wanting to use the timestamp as the compaction value
>
> > have
>
> > > to put that timestamp into the record header with a name, which
>
> > duplicates
>
> > > the field unnecessary. So to me without treating it as a special value
> we
>
> > > are doomed to have duplicate record field.
>
> > >
>
> > > > Having it this way would jeopardize my own particular use case, as I
>
> > need
>
> > > to have an incremental number representing the version (i.e.: 1, 2, 3,
> 5,
>
> > > 52, et cetera)
>
> > >
>
> > > The issue that I do not understand completely is why you'd keep saying
>
> > that
>
> > > why we need to convert it to a String, first then converting to any
> other
>
> > > fields. Since the header is organized in:
>
> > >
>
> > > public interface Header {
>
> > >
>
> > >     String key();
>
> > >
>
> > >     byte[] value();
>
> > >
>
> > > }
>
> > >
>
> > >
>
> > > Which means that the header value can be of any types. So with your use
>
> > > case why can't you just serialize your incremental version number into
> a
>
> > > byte array directly, whose lexico-order obeys the version number
> value??
>
> > I
>
> > > think the default byte serialization mechanism of the integer is
>
> > sufficient
>
> > > for this purpose (assuming that increment number is int).
>
> > >
>
> > >
>
> > >
>
> > > Guozhang
>
> > >
>
> > >
>
> > >
>
> > >
>
> > > On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral
>
> > <luis_cab...@yahoo.com.invalid
>
> > > >
>
> > > wrote:
>
> > >
>
> > > >  Hi Guozhang,
>
> > > >
>
> > > > Thank you very much for the patience in explaining your points, I've
>
> > > > learnt quite a bit in researching and experimenting after your
> replies.
>
> > > >
>
> > > >
>
> > > > bq. I still think it is worth defining `timestamp` as a special
>
> > > compaction
>
> > > > value
>
> > > >
>
> > > > I don't personally see advantages in it, but also the only
> disadvantage
>
> > > > that I can think of is putting multiple meanings on this field, which
>
> > > does
>
> > > > not seem enough to dissuade anyone, so I've added it to the KIP as a
>
> > > > compromise.
>
> > > > (please also see the pull request in case you want to confirm the
>
> > > > implementation matches your idea)
>
> > > >
>
> > > >
>
> > > > bq. Should it be "the record with the highest value will be kept"?
>
> > > >
>
> > > >
>
> > > > That is describing a scenario where the records being compared have
> the
>
> > > > same value, in which case the offset is used as a tie-breaker.
>
> > > > With trying to cover as much as possible, the "Proposed Changes" may
>
> > have
>
> > > > became confusing to read, sorry for that...
>
> > > >
>
> > > >
>
> > > > bq. Users are then responsible to encode their compaction field
>
> > according
>
> > > > to the byte array lexico-ordering to full fill their ordering
>
> > semantics.
>
> > > It
>
> > > > is more flexible to enforce users to encode their compaction field
>
> > always
>
> > > > as a long type.
>
> > > >
>
> > > > This was indeed my focus on the previous replies, since I am not sure
>
> > how
>
> > > > this would work without adding a lot of responsibility on the client
>
> > > side.
>
> > > > So, rather than trying to debate best practices, since I don't know
>
> > which
>
> > > > ones are being followed in this project, I will instead debate my own
>
> > > > selfish need for this feature:
>
> > > > Having it this way would jeopardize my own particular use case, as I
>
> > need
>
> > > > to have an incremental number representing the version (i.e.: 1, 2,
> 3,
>
> > 5,
>
> > > > 52, et cetera). It does not totally invalidate it, since we can
> always
>
> > > > convert it to String on the client side and left-pad with 0's to the
>
> > max
>
> > > > length of a long, but it seems a shame to have to do this as it would
>
> > > > increase the data transfer size (I'm trying to avoid it becoming a
>
> > > > bottleneck during high throughput periods). This would likely mean
>
> > that I
>
> > > > would start abusing the "timestamp" approach discussed above, as it
>
> > keeps
>
> > > > the messages nimble, but it would again be a shame to be forced into
>
> > > such a
>
> > > > hacky solution.
>
> > > > This is how I see it, and why I would like to avoid it. But maybe
> there
>
> > > is
>
> > > > some smarter way that you know of on how to handle it on the client
>
> > side
>
> > > > that would invalidate these concerns?
>
> > > > Please let me know, and I would also greatly value some more feedback
>
> > > from
>
> > > > other people regarding this topic, so please don't be shy!
>
> > > >
>
> > > > Kind Regards,Luis    On Friday, April 20, 2018, 7:41:30 PM GMT+2,
>
> > > Guozhang
>
> > > > Wang <wangg...@gmail.com> wrote:
>
> > > >
>
> > > >  Hi Luís,
>
> > > >
>
> > > > What I'm thinking primarily is that we only need to compare the
>
> > > compaction
>
> > > > values as LONG for the offset and timestmap "type" (I still think it
> is
>
> > > > worth defining `timestamp` as a special compaction value, with the
>
> > > reasons
>
> > > > below).
>
> > > >
>
> > > > Not sure if you've seen my other comment earlier regarding the
> offset /
>
> > > > timestmap, I'm pasting / editing them here to illustrate my idea:
>
> > > >
>
> > > > --------------
>
> > > >
>
> > > > I think maybe we have a mis-communication here: I'm not against the
>
> > idea
>
> > > of
>
> > > > using headers, but just trying to argue that we could make
> `timestamp`
>
> > > > field a special config value that is referring to the timestamp field
>
> > in
>
> > > > the metadata. So from log cleaner's pov:
>
> > > >
>
> > > > 1. if the config value is "offset", look into the offset field,
>
> > > *comparing
>
> > > > their value as long*
>
> > > > 2. if the config value is "timestamp", look into the timestamp field,
>
> > > > *comparing
>
> > > > their value as long*
>
> > > > 3. otherwise, say the config value is "foo", search for key "foo" in
>
> > the
>
> > > > message header, comparing the value as *byte arrays*
>
> > > >
>
> > > > I.e. "offset" and "timestamp" are treated as special cases other than
>
> > > case
>
> > > > 3) above.
>
> > > >
>
> > > > --------------
>
> > > >
>
> > > > I think your main concern is that "Although the byte[] can be
> compared,
>
> > > it
>
> > > > is not actually comparable as the versioning is based on a long",
> while
>
> > > I'm
>
> > > > thinking we can indeed generalize it: there is not hard reasons that
>
> > the
>
> > > > "compaction value" has to be a long, and since the goal of this KIP
> is
>
> > to
>
> > > > generalize the log compaction logic to consider header fields, why
> not
>
> > > > allowing it to be of any types than enforcing them still to be a long
>
> > > type?
>
> > > > Users are then responsible to encode their compaction field according
>
> > to
>
> > > > the byte array lexico-ordering to full fill their ordering semantics.
>
> > It
>
> > > is
>
> > > > more flexible to enforce users to encode their compaction field
> always
>
> > > as a
>
> > > > long type. Let me know WDYT.
>
> > > >
>
> > > >
>
> > > >
>
> > > > Also I have some minor comments on the wiki itself:
>
> > > >
>
> > > > 1) "When both records being compared contain a matching "compaction
>
> > > value",
>
> > > > then the record with the highest offset will be kept;"
>
> > > >
>
> > > > Should it be "the record with the highest value will be kept"?
>
> > > >
>
> > > >
>
> > > >
>
> > > >
>
> > > > Guozhang
>
> > > >
>
> > > >
>
> > > > On Fri, Apr 20, 2018 at 1:05 AM, Luís Cabral
>
> > > <luis_cab...@yahoo.com.invalid
>
> > > > >
>
> > > > wrote:
>
> > > >
>
> > > > >  Guozhang, is this reply ok with you?
>
> > > > >
>
> > > > >
>
> > > > > If you insist on the byte[] comparison directly, then I would need
>
> > some
>
> > > > > suggestions on how to represent a "version" with it, and then the
> KIP
>
> > > > could
>
> > > > > be changed to that.
>
> > > > >    On Tuesday, April 17, 2018, 2:44:16 PM GMT+2, Luís Cabral <
>
> > > > > luis_cab...@yahoo.com> wrote:
>
> > > > >
>
> > > > >  Oops, missed that email...
>
> > > > >
>
> > > > > bq. It is because when we compare the bytes we do not treat them as
>
> > > longs
>
> > > > > atall, so we just compare them based on bytes; I admit that if
>
> > users's
>
> > > > > headertypes have some semantic meanings (e.g. it is encoded from a
>
> > > long)
>
> > > > > they weare forcing them to choose the encoder that obeys key
>
> > > > > lexicographicordering; but I felt it is more general than enforcing
>
> > any
>
> > > > > fields that maybe used for log cleaner to be defined as a special
>
> > type.
>
> > > > >
>
> > > > > Yes, you can compare bytes between each other (its what that code
>
> > > does).
>
> > > > > You can then assume (or infer) that the encoding used allows for
>
> > > > > lexicographic ordering, which I hope you do not do a lot of. This
> is
>
> > > > > (logically) the same as converting to String and then comparing the
>
> > > > > strings, except that it allows for abstracting from the String
>
> > encoding
>
> > > > > (again, either with assumptions or with inferred knowledge).
>
> > > > > This is purely academic, however, as the versioning is based on a
>
> > long,
>
> > > > > which is not compatible with this approach. So, is this comment a
>
> > > > > fact-check stating that it is possible to compare byte[] overall,
> or
>
> > is
>
> > > > it
>
> > > > > about trying to use it in this KIP?
>
> > > > >
>
> > > > > Cheers
>
> > > > >
>
> > > > > PS (because I'm stubborn): It is still not comparable, this
>
> > comparison
>
> > > is
>
> > > > > all based on assumptions about the content of the byte array, but I
>
> > > hope
>
> > > > we
>
> > > > > can leave this stuff to Stack Overflow instead of debating it here
> :)
>
> > > > >
>
> > > > >
>
> > > >
>
> > > >
>
> > > >
>
> > > > --
>
> > > > -- Guozhang
>
> > > >
>
> > >
>
> > >
>
> > >
>
> > > --
>
> > > -- Guozhang
>
> > >
>
> > >
>
> >
>
> >
>
> > --
>
> > -- Guozhang
>
> >
>
> >
>
>
>
>
>
> --
>
> -- Guozhang
>
>
>
>



-- 
-- Guozhang

Reply via email to