Thanks. To reverse the question: if this argument holds, why does it not apply to the case when the header key is used as compaction attribute?
I am not against keeping both records in case timestamps are equal, but shouldn't we apply the same strategy for all cases and don't use offset as tie-breaker at all? -Matthias On 5/6/18 8:47 PM, Guozhang Wang wrote: > Hello Matthias, > > The related discussion was in the PR: > https://github.com/apache/kafka/pull/4822#discussion_r184588037 > > The concern is that, to use offset as tie breaker we need to double the > entry size of the entry in bounded compaction cache, and hence largely > reduce the effectiveness of the compaction itself. Since with milliseconds > timestamp the scenario of ties with the same key is expected to be small, I > think it would be a reasonable tradeoff to make. > > > Guozhang > > On Sun, May 6, 2018 at 9:37 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hi, >> >> I just updated myself on this KIP. One question (maybe it was discussed >> and I missed it). What is the motivation to not use the offset as tie >> breaker for the "timestamp" case? Isn't this inconsistent behavior? >> >> >> -Matthias >> >> >> On 5/2/18 2:07 PM, Guozhang Wang wrote: >>> Hello Luís, >>> >>> Sorry for the late reply. >>> >>> My understanding is that such duplicates will only happen if the >> non-offset >>> version value, either the timestamp or some long-typed header key, are >> the >>> same (i.e. we cannot break ties). >>> >>> 1. For timestamp, which is in milli-seconds, I think in practice the >>> likelihood of records with the same key and the same milli-sec timestamp >>> are very small. And hence the duplicate amount should be very small. >>> >>> 2. For long-typed header key, it is arguably out of Kafka's control, and >>> indeed users may (mistakenly) generate many records with the same key and >>> the same header value. >>> >>> >>> So I'd like to propose a counter-offer: for 1), we still use only 8 bytes >>> and allows for potential duplicates due to ties; for 2) we use 16 bytes >> to >>> always break ties. The motivation for distinguishing 1) and 2), is that >> my >>> expectation for 1) would be much common, and hence worth special handling >>> it to be more effective in cleaning. WDYT? >>> >>> >>> Guozhang >>> >>> >>> >>> On Wed, May 2, 2018 at 2:36 AM, Luís Cabral >> <luis_cab...@yahoo.com.invalid> >>> wrote: >>> >>>> Hi Guozhang, >>>> >>>> Have you managed to have a look at my reply? >>>> How do you feel about this? >>>> >>>> Kind Regards, >>>> Luís Cabral >>>> On Monday, April 30, 2018, 9:27:15 AM GMT+2, Luís Cabral < >>>> luis_cab...@yahoo.com> wrote: >>>> >>>> Hi Guozhang, >>>> >>>> I understand the argument, but this is a hazardous compromise for using >>>> Kafka as an event store (as is my original intention). >>>> >>>> I expect to have many duplicated messages in Kafka as the overall >>>> architecture being used allows for the producer to re-send a fresh >> state of >>>> the backed data into Kafka.Though this scenario is not common, as the >>>> intention is for Kafka to bear the weight of replaying all the records >> for >>>> new consumers, but it will occasionally happen. >>>> >>>> As there are plenty of records which are not updated frequently, this >>>> would leave the topic with a surplus of quite a few million duplicate >>>> records (and increasing every time the above mentioned function is >> applied). >>>> >>>> I would prefer to have the temporary memory footprint of 8 bytes per >>>> record whenever the compaction is run (only when not in 'offset' mode), >>>> than allowing for the topic to run into this state. >>>> >>>> What do you think? Is this scenario too specific for me, or do you >> believe >>>> that it could happen to other clients as well? >>>> >>>> Thanks again for the continued discussion! >>>> Cheers, >>>> Luis On Friday, April 27, 2018, 8:21:13 PM GMT+2, Guozhang Wang < >>>> wangg...@gmail.com> wrote: >>>> >>>> Hello Luis, >>>> >>>> When the comparing the version returns `equal`, the original proposal >> is to >>>> use the offset as the tie breaker. My previous comment is that >>>> >>>> 1) when we build the map calling `put`, if there is already an entry for >>>> the key, compare its stored version, and replace if the put record's >>>> version is "no smaller than" the stored record: this is because when >>>> building the map we are always going from smaller offsets to larger >> ones. >>>> >>>> 2) when making a second pass to determine if each record should be >> retained >>>> based on the map, we do not try to break the tie if the map's returned >>>> version is the same but always treat it as "keep". In this case when we >> are >>>> comparing a record with itself stored in the offset map, version >> comparison >>>> would return `equals`. As I mentioned in the PR, one caveat is that we >> may >>>> indeed have multiple records with the same key and the same version, but >>>> once a new versioned record is appended it will be deleted. >>>> >>>> >>>> Does that make sense? >>>> >>>> Guozhang >>>> >>>> >>>> On Fri, Apr 27, 2018 at 4:20 AM, Luís Cabral >> <luis_cab...@yahoo.com.invalid >>>>> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I was updating the PR to match the latest decisions and noticed (or >>>>> rather, the integration tests noticed) that without storing the offset, >>>>> then the cache doesn't know when to keep the record itself. >>>>> >>>>> This is because, after the cache is populated, all the records are >>>>> compared against the stored ones, so "Record{key:A,offset:1, >> version:1}" >>>>> will compare against itself and be flagged as "don't keep", since we >> only >>>>> compared based on the version and didn't check to see if the offset was >>>> the >>>>> same or not. >>>>> >>>>> This sort of invalidates not storing the offset in the cache, >>>>> unfortunately, and the binary footprint increases two-fold when >> "offset" >>>> is >>>>> not used as a compaction strategy. >>>>> >>>>> Guozhang: Is it ok with you if we go back on this decision and leave >> the >>>>> offset as a tie-breaker? >>>>> >>>>> >>>>> Kind Regards,Luis >>>>> >>>>> On Friday, April 27, 2018, 11:11:55 AM GMT+2, Luís Cabral >>>>> <luis_cab...@yahoo.com.INVALID> wrote: >>>>> >>>>> Hi, >>>>> >>>>> The KIP is now updated with the results of the byte array discussion. >>>>> >>>>> This is my first contribution to Kafka, so I'm not sure on what the >>>>> processes are. Is it now acceptable to take this into a vote, or >> should I >>>>> ask for more contributors to join the discussion first? >>>>> >>>>> Kind Regards,Luis On Friday, April 27, 2018, 6:12:03 AM GMT+2, >>>> Guozhang >>>>> Wang <wangg...@gmail.com> wrote: >>>>> >>>>> Hello Luís, >>>>> >>>>>> Offset is an integer? I've only noticed it being resolved as a long so >>>>> far. >>>>> >>>>> You are right, offset is a long. >>>>> >>>>> As for timestamp / other types, I left a comment in your PR about >>>> handling >>>>> tie breakers. >>>>> >>>>>> Given these arguments, is this point something that you absolutely >> must >>>>> have? >>>>> >>>>> No I do not have a strong use case in mind to go with arbitrary byte >>>>> arrays, was just thinking that if we are going to enhance log >> compaction >>>>> why not generalize it more :) >>>>> >>>>> Your concern about the memory usage makes sense. I'm happy to take my >>>>> suggestion back and enforce only long typed fields. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Apr 26, 2018 at 1:44 AM, Luís Cabral >>>> <luis_cab...@yahoo.com.invalid >>>>>> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> bq. have a integer typed OffsetMap (for offset) >>>>>> >>>>>> Offset is an integer? I've only noticed it being resolved as a long so >>>>> far. >>>>>> >>>>>> >>>>>> bq. long typed OffsetMap (for timestamp) >>>>>> >>>>>> We would still need to store the offset, as it is functioning as a >>>>>> tie-breaker. Not that this is a big deal, we can be easily have both >>>> (as >>>>>> currently done on the PR). >>>>>> >>>>>> >>>>>> bq. For the byte array typed offset map, we can use a general hashmap, >>>>>> where the hashmap's CAPACITY will be reasoned from the given "val >>>> memory: >>>>>> Int" parameter >>>>>> >>>>>> If you have a map with 128 byte capacity, then store a value with 16 >>>>> bytes >>>>>> and another with 32 bytes, how many free slots do you have left in >> this >>>>> map? >>>>>> >>>>>> You can make this work, but I think you would need to re-design the >>>> whole >>>>>> log cleaner approach, which implies changing some of the already >>>> existing >>>>>> configurations (like "log.cleaner.io.buffer.load.factor"). I would >>>>> rather >>>>>> maintain backwards compatibility as much as possible in this KIP, and >>>> if >>>>>> this means that using "foo" / "bar" or "2.1-a" / "3.20-b" as record >>>>>> versions is not viable, then so be it. >>>>>> >>>>>> Given these arguments, is this point something that you absolutely >> must >>>>>> have? I'm still sort of hoping that you are just entertaining the idea >>>>> and >>>>>> are ok with having a long (now conceded to be unsigned, so the byte >>>>> arrays >>>>>> can be compared directly). >>>>>> >>>>>> >>>>>> Kind Regards,Luis >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >>> >>> >> >> > >
signature.asc
Description: OpenPGP digital signature