Luis,

Just sharing my two cents here: we can extend the OffsetMap trait, making
its "offset" type a generic template, and have a integer typed OffsetMap
(for offset), and a long typed OffsetMap (for timestamp), and a byte array
typed OffsetMap (record header fields).

For the byte array typed offset map, we can use a general hashmap, where
the hashmap's CAPACITY will be reasoned from the given "val memory: Int"
parameter, for example:
http://java-performance.info/memory-consumption-of-java-data-types-2/


Guozhang


On Tue, Apr 24, 2018 at 1:40 PM, Luis Cabral <luis_cab...@yahoo.com.invalid>
wrote:

> Hi Guozhang,
>
> I mean here:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/
> OffsetMap.scala
>
>
> It seems that this class was designed with a great focus on preventing
> memory starvation, and this class is also very central to the changes
> required for this KIP.
>
> So far this has not been of great concern, as enhancing that logic with an
> extra 8 bytes is quite easy (have a look at the pull request for more
> details).
> Is it alright to enhance this with a variable amount of bytes, though? Or
> to replace it with a direct record cache?
>
> Kind Regards
> Luis
>
>
> > On 24 Apr 2018, at 20:30, Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Not sure if I fully understand your question, but here's my
> understanding:
> > In LogCleaner we call:
> >
> > "val records = MemoryRecords.readableRecords(readBuffer)"
> >
> > Which returns a MemoryRecords object, and then call filterInto with a
> given
> > customized RecordFilter that instantiates "checkBatchRetention(batch:
> > RecordBatch)". Note that `RecordBatch` is just an iterator of `Record`,
> > which contains the headers, so we can just access that header there.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Apr 24, 2018 at 12:41 AM, Luís Cabral
> <luis_cab...@yahoo.com.invalid
> >> wrote:
> >
> >>
> >> Hi Guozhang,
> >>
> >> As much as I would like to move on from this topic, I've now tried to
> >> implement it into the pull request, and could not find a viable way to
> >> store a variable size byte array into the current concept of the log
> >> cleaner (with long the current approach just always considers it to be 8
> >> bytes).
> >>
> >> Do you have any suggestions on how to handle this issue there?
> >>
> >> Kind Regards,
> >> Luis
> >>
> >>    On Tuesday, April 24, 2018, 1:11:11 AM GMT+2, Luís Cabral <
> >> luis_cab...@yahoo.com> wrote:
> >>
> >> #yiv6853119978 #yiv6853119978 -- _filtered #yiv6853119978 {panose-1:2 4
> 5
> >> 3 5 4 6 3 2 4;} _filtered #yiv6853119978 {font-family:Calibri;panose-1:
> 2
> >> 15 5 2 2 2 4 3 2 4;}#yiv6853119978 #yiv6853119978
> p.yiv6853119978MsoNormal,
> >> #yiv6853119978 li.yiv6853119978MsoNormal, #yiv6853119978
> >> div.yiv6853119978MsoNormal {margin:0cm;margin-bottom:.
> >> 0001pt;font-size:11.0pt;font-family:sans-serif;}#yiv6853119978 a:link,
> >> #yiv6853119978 span.yiv6853119978MsoHyperlink
> {color:blue;text-decoration:underline;}#yiv6853119978
> >> a:visited, #yiv6853119978 span.yiv6853119978MsoHyperlinkFollowed
> >> {color:#954F72;text-decoration:underline;}#yiv6853119978
> >> .yiv6853119978MsoChpDefault {} _filtered #yiv6853119978 {margin:72.0pt
> >> 72.0pt 72.0pt 72.0pt;}#yiv6853119978 div.yiv6853119978WordSection1
> >> {}#yiv6853119978
> >> That is definitely clearer, KIP updated!
> >>
> >>
> >>
> >> From: Guozhang Wang
> >> Sent: 23 April 2018 23:44
> >> To: dev@kafka.apache.org
> >> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >>
> >>
> >>
> >> Thanks Luís. The KIP looks good to me. Just that what I left as a minor:
> >>
> >>
> >>
> >> `When both records being compared contain a matching "compaction value",
> >>
> >> then the record with the highest offset will be kept;`
> >>
> >>
> >>
> >> I understand your intent, it's just that the sentence itself is a bit
> >>
> >> misleading, I think what you actually meant to say:
> >>
> >>
> >>
> >> `When both records being compared contain a matching "compaction value"
> and
> >>
> >> their corresponding byte arrays are considered equal, then the record
> with
> >>
> >> the highest offset will be kept;`
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Apr 23, 2018 at 1:54 PM, Luís Cabral
> <luis_cab...@yahoo.com.invalid
> >>>
> >>
> >> wrote:
> >>
> >>
> >>
> >>> Hello Guozhang,
> >>
> >>>
> >>
> >>> The KIP is now updated to reflect this choice in strategy.
> >>
> >>> Please let me know your thoughts there.
> >>
> >>>
> >>
> >>> Kind Regards,
> >>
> >>> Luís
> >>
> >>>
> >>
> >>> From: Guozhang Wang
> >>
> >>> Sent: 23 April 2018 19:32
> >>
> >>> To: dev@kafka.apache.org
> >>
> >>> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >>
> >>>
> >>
> >>> Hi Luis,
> >>
> >>>
> >>
> >>> I think by "generalizing it" we could go beyond numerical values, and
> >>
> >>> that's why I suggested we do not need to require that the type
> serialized
> >>
> >>> to the bytes have any numerical semantics since it has to ben
> serialized
> >> to
> >>
> >>> a byte array anyways. I understand that for your use case, the intended
> >>
> >>> record header compaction value is a number, but imagine if someone else
> >>
> >>> wants to compact the same-keyed messages based on some record header
> >>
> >>> key-value pair whose value types before serializing to bytes are not
> >>
> >>> numbers at all, but just some strings:
> >>
> >>>
> >>
> >>> key: "A", value: "a1", header: ["bar" -> "a".bytes()],
> >>
> >>> key: "A", value: "a2", header: ["bar" -> "c".bytes()],
> >>
> >>> key: "A", value: "a3", header: ["bar" -> "b".bytes()],
> >>
> >>>
> >>
> >>>
> >>
> >>> Could we allow them to use that header for compaction as well?
> >>
> >>>
> >>
> >>>
> >>
> >>> Now going back to your use case, for numbers that could be negative
> >> values,
> >>
> >>> as long as users are aware of the requirement and change the default
> >>
> >>> encoding schemes when they generate the producer record while setting
> the
> >>
> >>> headers so that the serialized bytes still obey the value that should
> be
> >> OK
> >>
> >>> (again, as I said, we push this responsibility to users to define the
> >> right
> >>
> >>> serde mechanism, but that seems to be more flexible). For example: -INF
> >>
> >>> serialized to 0x00000000, -INF+1 serialized to 0x00000001, etc.
> >>
> >>>
> >>
> >>>
> >>
> >>>
> >>
> >>> Guozhang
> >>
> >>>
> >>
> >>>
> >>
> >>>
> >>
> >>>
> >>
> >>>
> >>
> >>> On Mon, Apr 23, 2018 at 10:19 AM, Luís Cabral
> >>
> >>> <luis_cab...@yahoo.com.invalid
> >>
> >>>> wrote:
> >>
> >>>
> >>
> >>>> Hello Guozhang,
> >>
> >>>>
> >>
> >>>> Thanks for the fast reply!
> >>
> >>>>
> >>
> >>>> As for the matter of the timestamp, it’s now added to the KIP, so I
> >> hope
> >>
> >>>> this is correctly addressed.
> >>
> >>>> Kindly let me know if you would like some adaptions to the concept.
> >>
> >>>>
> >>
> >>>>
> >>
> >>>> bq. The issue that I do not understand completely is why you'd keep
> >>
> >>> saying
> >>
> >>>> that why we need to convert it to a String, first then converting to
> >> any
> >>
> >>>> other fields.
> >>
> >>>>
> >>
> >>>> Maybe I’m over-engineering it again, and the problem can be simplified
> >> to
> >>
> >>>> restricting this to values greater than or equal to zero, which ends
> up
> >>
> >>>> being ok for my own use case...
> >>
> >>>> This would then generally guarantee the lexicographic ordering, as you
> >>
> >>> say.
> >>
> >>>> Is this what you mean? Should I then add this restriction to the KIP?
> >>
> >>>>
> >>
> >>>> Cheers,
> >>
> >>>> Luis
> >>
> >>>>
> >>
> >>>> From: Guozhang Wang
> >>
> >>>> Sent: 23 April 2018 17:55
> >>
> >>>> To: dev@kafka.apache.org
> >>
> >>>> Subject: Re: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >>
> >>>>
> >>
> >>>> Hello Luis,
> >>
> >>>>
> >>
> >>>> Thanks for your email, replying to your points in the following:
> >>
> >>>>
> >>
> >>>>> I don't personally see advantages in it, but also the only
> >> disadvantage
> >>
> >>>> that I can think of is putting multiple meanings on this field.
> >>
> >>>>
> >>
> >>>> If we do not treat timestamp as a special value of the config, then I
> >>
> >>>> cannot use the timestamp field of the record as the compaction value,
> >>
> >>> since
> >>
> >>>> we will only look into the record header other than the default
> offset,
> >>
> >>>> right? Then users wanting to use the timestamp as the compaction value
> >>
> >>> have
> >>
> >>>> to put that timestamp into the record header with a name, which
> >>
> >>> duplicates
> >>
> >>>> the field unnecessary. So to me without treating it as a special value
> >> we
> >>
> >>>> are doomed to have duplicate record field.
> >>
> >>>>
> >>
> >>>>> Having it this way would jeopardize my own particular use case, as I
> >>
> >>> need
> >>
> >>>> to have an incremental number representing the version (i.e.: 1, 2, 3,
> >> 5,
> >>
> >>>> 52, et cetera)
> >>
> >>>>
> >>
> >>>> The issue that I do not understand completely is why you'd keep saying
> >>
> >>> that
> >>
> >>>> why we need to convert it to a String, first then converting to any
> >> other
> >>
> >>>> fields. Since the header is organized in:
> >>
> >>>>
> >>
> >>>> public interface Header {
> >>
> >>>>
> >>
> >>>>    String key();
> >>
> >>>>
> >>
> >>>>    byte[] value();
> >>
> >>>>
> >>
> >>>> }
> >>
> >>>>
> >>
> >>>>
> >>
> >>>> Which means that the header value can be of any types. So with your
> use
> >>
> >>>> case why can't you just serialize your incremental version number into
> >> a
> >>
> >>>> byte array directly, whose lexico-order obeys the version number
> >> value??
> >>
> >>> I
> >>
> >>>> think the default byte serialization mechanism of the integer is
> >>
> >>> sufficient
> >>
> >>>> for this purpose (assuming that increment number is int).
> >>
> >>>>
> >>
> >>>>
> >>
> >>>>
> >>
> >>>> Guozhang
> >>
> >>>>
> >>
> >>>>
> >>
> >>>>
> >>
> >>>>
> >>
> >>>> On Mon, Apr 23, 2018 at 2:30 AM, Luís Cabral
> >>
> >>> <luis_cab...@yahoo.com.invalid
> >>
> >>>>>
> >>
> >>>> wrote:
> >>
> >>>>
> >>
> >>>>> Hi Guozhang,
> >>
> >>>>>
> >>
> >>>>> Thank you very much for the patience in explaining your points, I've
> >>
> >>>>> learnt quite a bit in researching and experimenting after your
> >> replies.
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> bq. I still think it is worth defining `timestamp` as a special
> >>
> >>>> compaction
> >>
> >>>>> value
> >>
> >>>>>
> >>
> >>>>> I don't personally see advantages in it, but also the only
> >> disadvantage
> >>
> >>>>> that I can think of is putting multiple meanings on this field, which
> >>
> >>>> does
> >>
> >>>>> not seem enough to dissuade anyone, so I've added it to the KIP as a
> >>
> >>>>> compromise.
> >>
> >>>>> (please also see the pull request in case you want to confirm the
> >>
> >>>>> implementation matches your idea)
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> bq. Should it be "the record with the highest value will be kept"?
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> That is describing a scenario where the records being compared have
> >> the
> >>
> >>>>> same value, in which case the offset is used as a tie-breaker.
> >>
> >>>>> With trying to cover as much as possible, the "Proposed Changes" may
> >>
> >>> have
> >>
> >>>>> became confusing to read, sorry for that...
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> bq. Users are then responsible to encode their compaction field
> >>
> >>> according
> >>
> >>>>> to the byte array lexico-ordering to full fill their ordering
> >>
> >>> semantics.
> >>
> >>>> It
> >>
> >>>>> is more flexible to enforce users to encode their compaction field
> >>
> >>> always
> >>
> >>>>> as a long type.
> >>
> >>>>>
> >>
> >>>>> This was indeed my focus on the previous replies, since I am not sure
> >>
> >>> how
> >>
> >>>>> this would work without adding a lot of responsibility on the client
> >>
> >>>> side.
> >>
> >>>>> So, rather than trying to debate best practices, since I don't know
> >>
> >>> which
> >>
> >>>>> ones are being followed in this project, I will instead debate my own
> >>
> >>>>> selfish need for this feature:
> >>
> >>>>> Having it this way would jeopardize my own particular use case, as I
> >>
> >>> need
> >>
> >>>>> to have an incremental number representing the version (i.e.: 1, 2,
> >> 3,
> >>
> >>> 5,
> >>
> >>>>> 52, et cetera). It does not totally invalidate it, since we can
> >> always
> >>
> >>>>> convert it to String on the client side and left-pad with 0's to the
> >>
> >>> max
> >>
> >>>>> length of a long, but it seems a shame to have to do this as it would
> >>
> >>>>> increase the data transfer size (I'm trying to avoid it becoming a
> >>
> >>>>> bottleneck during high throughput periods). This would likely mean
> >>
> >>> that I
> >>
> >>>>> would start abusing the "timestamp" approach discussed above, as it
> >>
> >>> keeps
> >>
> >>>>> the messages nimble, but it would again be a shame to be forced into
> >>
> >>>> such a
> >>
> >>>>> hacky solution.
> >>
> >>>>> This is how I see it, and why I would like to avoid it. But maybe
> >> there
> >>
> >>>> is
> >>
> >>>>> some smarter way that you know of on how to handle it on the client
> >>
> >>> side
> >>
> >>>>> that would invalidate these concerns?
> >>
> >>>>> Please let me know, and I would also greatly value some more feedback
> >>
> >>>> from
> >>
> >>>>> other people regarding this topic, so please don't be shy!
> >>
> >>>>>
> >>
> >>>>> Kind Regards,Luis    On Friday, April 20, 2018, 7:41:30 PM GMT+2,
> >>
> >>>> Guozhang
> >>
> >>>>> Wang <wangg...@gmail.com> wrote:
> >>
> >>>>>
> >>
> >>>>> Hi Luís,
> >>
> >>>>>
> >>
> >>>>> What I'm thinking primarily is that we only need to compare the
> >>
> >>>> compaction
> >>
> >>>>> values as LONG for the offset and timestmap "type" (I still think it
> >> is
> >>
> >>>>> worth defining `timestamp` as a special compaction value, with the
> >>
> >>>> reasons
> >>
> >>>>> below).
> >>
> >>>>>
> >>
> >>>>> Not sure if you've seen my other comment earlier regarding the
> >> offset /
> >>
> >>>>> timestmap, I'm pasting / editing them here to illustrate my idea:
> >>
> >>>>>
> >>
> >>>>> --------------
> >>
> >>>>>
> >>
> >>>>> I think maybe we have a mis-communication here: I'm not against the
> >>
> >>> idea
> >>
> >>>> of
> >>
> >>>>> using headers, but just trying to argue that we could make
> >> `timestamp`
> >>
> >>>>> field a special config value that is referring to the timestamp field
> >>
> >>> in
> >>
> >>>>> the metadata. So from log cleaner's pov:
> >>
> >>>>>
> >>
> >>>>> 1. if the config value is "offset", look into the offset field,
> >>
> >>>> *comparing
> >>
> >>>>> their value as long*
> >>
> >>>>> 2. if the config value is "timestamp", look into the timestamp field,
> >>
> >>>>> *comparing
> >>
> >>>>> their value as long*
> >>
> >>>>> 3. otherwise, say the config value is "foo", search for key "foo" in
> >>
> >>> the
> >>
> >>>>> message header, comparing the value as *byte arrays*
> >>
> >>>>>
> >>
> >>>>> I.e. "offset" and "timestamp" are treated as special cases other than
> >>
> >>>> case
> >>
> >>>>> 3) above.
> >>
> >>>>>
> >>
> >>>>> --------------
> >>
> >>>>>
> >>
> >>>>> I think your main concern is that "Although the byte[] can be
> >> compared,
> >>
> >>>> it
> >>
> >>>>> is not actually comparable as the versioning is based on a long",
> >> while
> >>
> >>>> I'm
> >>
> >>>>> thinking we can indeed generalize it: there is not hard reasons that
> >>
> >>> the
> >>
> >>>>> "compaction value" has to be a long, and since the goal of this KIP
> >> is
> >>
> >>> to
> >>
> >>>>> generalize the log compaction logic to consider header fields, why
> >> not
> >>
> >>>>> allowing it to be of any types than enforcing them still to be a long
> >>
> >>>> type?
> >>
> >>>>> Users are then responsible to encode their compaction field according
> >>
> >>> to
> >>
> >>>>> the byte array lexico-ordering to full fill their ordering semantics.
> >>
> >>> It
> >>
> >>>> is
> >>
> >>>>> more flexible to enforce users to encode their compaction field
> >> always
> >>
> >>>> as a
> >>
> >>>>> long type. Let me know WDYT.
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> Also I have some minor comments on the wiki itself:
> >>
> >>>>>
> >>
> >>>>> 1) "When both records being compared contain a matching "compaction
> >>
> >>>> value",
> >>
> >>>>> then the record with the highest offset will be kept;"
> >>
> >>>>>
> >>
> >>>>> Should it be "the record with the highest value will be kept"?
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> Guozhang
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> On Fri, Apr 20, 2018 at 1:05 AM, Luís Cabral
> >>
> >>>> <luis_cab...@yahoo.com.invalid
> >>
> >>>>>>
> >>
> >>>>> wrote:
> >>
> >>>>>
> >>
> >>>>>> Guozhang, is this reply ok with you?
> >>
> >>>>>>
> >>
> >>>>>>
> >>
> >>>>>> If you insist on the byte[] comparison directly, then I would need
> >>
> >>> some
> >>
> >>>>>> suggestions on how to represent a "version" with it, and then the
> >> KIP
> >>
> >>>>> could
> >>
> >>>>>> be changed to that.
> >>
> >>>>>>   On Tuesday, April 17, 2018, 2:44:16 PM GMT+2, Luís Cabral <
> >>
> >>>>>> luis_cab...@yahoo.com> wrote:
> >>
> >>>>>>
> >>
> >>>>>> Oops, missed that email...
> >>
> >>>>>>
> >>
> >>>>>> bq. It is because when we compare the bytes we do not treat them as
> >>
> >>>> longs
> >>
> >>>>>> atall, so we just compare them based on bytes; I admit that if
> >>
> >>> users's
> >>
> >>>>>> headertypes have some semantic meanings (e.g. it is encoded from a
> >>
> >>>> long)
> >>
> >>>>>> they weare forcing them to choose the encoder that obeys key
> >>
> >>>>>> lexicographicordering; but I felt it is more general than enforcing
> >>
> >>> any
> >>
> >>>>>> fields that maybe used for log cleaner to be defined as a special
> >>
> >>> type.
> >>
> >>>>>>
> >>
> >>>>>> Yes, you can compare bytes between each other (its what that code
> >>
> >>>> does).
> >>
> >>>>>> You can then assume (or infer) that the encoding used allows for
> >>
> >>>>>> lexicographic ordering, which I hope you do not do a lot of. This
> >> is
> >>
> >>>>>> (logically) the same as converting to String and then comparing the
> >>
> >>>>>> strings, except that it allows for abstracting from the String
> >>
> >>> encoding
> >>
> >>>>>> (again, either with assumptions or with inferred knowledge).
> >>
> >>>>>> This is purely academic, however, as the versioning is based on a
> >>
> >>> long,
> >>
> >>>>>> which is not compatible with this approach. So, is this comment a
> >>
> >>>>>> fact-check stating that it is possible to compare byte[] overall,
> >> or
> >>
> >>> is
> >>
> >>>>> it
> >>
> >>>>>> about trying to use it in this KIP?
> >>
> >>>>>>
> >>
> >>>>>> Cheers
> >>
> >>>>>>
> >>
> >>>>>> PS (because I'm stubborn): It is still not comparable, this
> >>
> >>> comparison
> >>
> >>>> is
> >>
> >>>>>> all based on assumptions about the content of the byte array, but I
> >>
> >>>> hope
> >>
> >>>>> we
> >>
> >>>>>> can leave this stuff to Stack Overflow instead of debating it here
> >> :)
> >>
> >>>>>>
> >>
> >>>>>>
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>>
> >>
> >>>>> --
> >>
> >>>>> -- Guozhang
> >>
> >>>>>
> >>
> >>>>
> >>
> >>>>
> >>
> >>>>
> >>
> >>>> --
> >>
> >>>> -- Guozhang
> >>
> >>>>
> >>
> >>>>
> >>
> >>>
> >>
> >>>
> >>
> >>> --
> >>
> >>> -- Guozhang
> >>
> >>>
> >>
> >>>
> >>
> >>
> >>
> >>
> >>
> >> --
> >>
> >> -- Guozhang
> >>
> >>
> >>
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to