Just to clarify, the implementation is basically what I mentioned above (split/resend + adjusted estimation evolving algorithm) and changing the compression ratio estimation to be per topic.
Thanks, Jiangjie (Becket) Qin On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin <becket....@gmail.com> wrote: > I went ahead and have a patch submitted here: > https://github.com/apache/kafka/pull/2638 > > Per Joel's suggestion, I changed the compression ratio to be per topic as > well. It seems working well. Since there is an important behavior change > and a new sensor is added, I'll keep the KIP and update it according. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > >> > >> > Lets say we sent the batch over the wire and received a >> > RecordTooLargeException, how do we split it as once we add the message >> to >> > the batch we loose the message level granularity. We will have to >> > decompress, do deep iteration and split and again compress. right? This >> > looks like a performance bottle neck in case of multi topic producers >> like >> > mirror maker. >> > >> >> Yes, but these should be outliers if we do estimation on a per-topic basis >> and if we target a conservative-enough compression ratio. The producer >> should also avoid sending over the wire if it can be made aware of the >> max-message size limit on the broker, and split if it determines that a >> record exceeds the broker's config. Ideally this should be part of topic >> metadata but is not - so it could be off a periodic describe-configs >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+ >> Command+line+and+centralized+administrative+operations#KIP- >> 4-Commandlineandcentralizedadministrativeoperations-Describe >> ConfigsRequest> >> (which isn't available yet). This doesn't remove the need to split and >> recompress though. >> >> >> > On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <becket....@gmail.com> >> wrote: >> > >> > > Hey Mayuresh, >> > > >> > > 1) The batch would be split when an RecordTooLargeException is >> received. >> > > 2) Not lower the actual compression ratio, but lower the estimated >> > > compression ratio "according to" the Actual Compression Ratio(ACR). >> > > >> > > An example, let's start with Estimated Compression Ratio (ECR) = 1.0. >> Say >> > > the compression ratio of ACR is ~0.8, instead of letting the ECR >> dropped >> > to >> > > 0.8 very quickly, we only drop 0.001 every time when ACR < ECR. >> However, >> > > once we see an ACR > ECR, we increment ECR by 0.05. If a >> > > RecordTooLargeException is received, we reset the ECR back to 1.0 and >> > split >> > > the batch. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > >> > > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat < >> > > gharatmayures...@gmail.com> wrote: >> > > >> > > > Hi Becket, >> > > > >> > > > Seems like an interesting idea. >> > > > I had couple of questions : >> > > > 1) How do we decide when the batch should be split? >> > > > 2) What do you mean by slowly lowering the "actual" compression >> ratio? >> > > > An example would really help here. >> > > > >> > > > Thanks, >> > > > >> > > > Mayuresh >> > > > >> > > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin <becket....@gmail.com> >> > > wrote: >> > > > >> > > > > Hi Jay, >> > > > > >> > > > > Yeah, I got your point. >> > > > > >> > > > > I think there might be a solution which do not require adding a >> new >> > > > > configuration. We can start from a very conservative compression >> > ratio >> > > > say >> > > > > 1.0 and lower it very slowly according to the actual compression >> > ratio >> > > > > until we hit a point that we have to split a batch. At that >> point, we >> > > > > exponentially back off on the compression ratio. The idea is >> somewhat >> > > > like >> > > > > TCP. This should help avoid frequent split. >> > > > > >> > > > > The upper bound of the batch size is also a little awkward today >> > > because >> > > > we >> > > > > say the batch size is based on compressed size, but users cannot >> set >> > it >> > > > to >> > > > > the max message size because that will result in oversized >> messages. >> > > With >> > > > > this change we will be able to allow the users to set the message >> > size >> > > to >> > > > > close to max message size. >> > > > > >> > > > > However the downside is that there could be latency spikes in the >> > > system >> > > > in >> > > > > this case due to the splitting, especially when there are many >> > messages >> > > > > need to be split at the same time. That could potentially be an >> issue >> > > for >> > > > > some users. >> > > > > >> > > > > What do you think about this approach? >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jiangjie (Becket) Qin >> > > > > >> > > > > >> > > > > >> > > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps <j...@confluent.io> >> wrote: >> > > > > >> > > > > > Hey Becket, >> > > > > > >> > > > > > Yeah that makes sense. >> > > > > > >> > > > > > I agree that you'd really have to both fix the estimation (i.e. >> > make >> > > it >> > > > > per >> > > > > > topic or make it better estimate the high percentiles) AND have >> the >> > > > > > recovery mechanism. If you are underestimating often and then >> > paying >> > > a >> > > > > high >> > > > > > recovery price that won't fly. >> > > > > > >> > > > > > I think you take my main point though, which is just that I >> hate to >> > > > > exposes >> > > > > > these super low level options to users because it is so hard to >> > > explain >> > > > > to >> > > > > > people what it means and how they should set it. So if it is >> > possible >> > > > to >> > > > > > make either some combination of better estimation and splitting >> or >> > > > better >> > > > > > tolerance of overage that would be preferrable. >> > > > > > >> > > > > > -Jay >> > > > > > >> > > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin < >> becket....@gmail.com >> > > >> > > > > wrote: >> > > > > > >> > > > > > > @Dong, >> > > > > > > >> > > > > > > Thanks for the comments. The default behavior of the producer >> > won't >> > > > > > change. >> > > > > > > If the users want to use the uncompressed message size, they >> > > probably >> > > > > > will >> > > > > > > also bump up the batch size to somewhere close to the max >> message >> > > > size. >> > > > > > > This would be in the document. BTW the default batch size is >> 16K >> > > > which >> > > > > is >> > > > > > > pretty small. >> > > > > > > >> > > > > > > @Jay, >> > > > > > > >> > > > > > > Yeah, we actually had debated quite a bit internally what is >> the >> > > best >> > > > > > > solution to this. >> > > > > > > >> > > > > > > I completely agree it is a bug. In practice we usually leave >> some >> > > > > > headroom >> > > > > > > to allow the compressed size to grow a little if the the >> original >> > > > > > messages >> > > > > > > are not compressible, for example, 1000 KB instead of exactly >> 1 >> > MB. >> > > > It >> > > > > is >> > > > > > > likely safe enough. >> > > > > > > >> > > > > > > The major concern for the rejected alternative is >> performance. It >> > > > > largely >> > > > > > > depends on how frequent we need to split a batch, i.e. how >> likely >> > > the >> > > > > > > estimation can go off. If we only need to the split work >> > > > occasionally, >> > > > > > the >> > > > > > > cost would be amortized so we don't need to worry about it too >> > > much. >> > > > > > > However, it looks that for a producer with shared topics, the >> > > > > estimation >> > > > > > is >> > > > > > > always off. As an example, consider two topics, one with >> > > compression >> > > > > > ratio >> > > > > > > 0.6 the other 0.2, assuming exactly same traffic, the average >> > > > > compression >> > > > > > > ratio would be roughly 0.4, which is not right for either of >> the >> > > > > topics. >> > > > > > So >> > > > > > > almost half of the batches (of the topics with 0.6 compression >> > > ratio) >> > > > > > will >> > > > > > > end up larger than the configured batch size. When it comes to >> > more >> > > > > > topics >> > > > > > > such as mirror maker, this becomes more unpredictable. To >> avoid >> > > > > frequent >> > > > > > > rejection / split of the batches, we need to configured the >> batch >> > > > size >> > > > > > > pretty conservatively. This could actually hurt the >> performance >> > > > because >> > > > > > we >> > > > > > > are shoehorn the messages that are highly compressible to a >> small >> > > > batch >> > > > > > so >> > > > > > > that the other topics that are not that compressible will not >> > > become >> > > > > too >> > > > > > > large with the same batch size. At LinkedIn, our batch size is >> > > > > configured >> > > > > > > to 64 KB because of this. I think we may actually have better >> > > > batching >> > > > > if >> > > > > > > we just use the uncompressed message size and 800 KB batch >> size. >> > > > > > > >> > > > > > > We did not think about loosening the message size restriction, >> > but >> > > > that >> > > > > > > sounds a viable solution given that the consumer now can fetch >> > > > > oversized >> > > > > > > messages. One concern would be that on the broker side >> oversized >> > > > > messages >> > > > > > > will bring more memory pressure. With KIP-92, we may mitigate >> > that, >> > > > but >> > > > > > the >> > > > > > > memory allocation for large messages may not be very GC >> > friendly. I >> > > > > need >> > > > > > to >> > > > > > > think about this a little more. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > Jiangjie (Becket) Qin >> > > > > > > >> > > > > > > >> > > > > > > On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <j...@confluent.io> >> > > wrote: >> > > > > > > >> > > > > > > > Hey Becket, >> > > > > > > > >> > > > > > > > I get the problem we want to solve with this, but I don't >> think >> > > > this >> > > > > is >> > > > > > > > something that makes sense as a user controlled knob that >> > > everyone >> > > > > > > sending >> > > > > > > > data to kafka has to think about. It is basically a bug, >> right? >> > > > > > > > >> > > > > > > > First, as a technical question is it true that using the >> > > > uncompressed >> > > > > > > size >> > > > > > > > for batching actually guarantees that you observe the >> limit? I >> > > > think >> > > > > > that >> > > > > > > > implies that compression always makes the messages smaller, >> > > which i >> > > > > > think >> > > > > > > > usually true but is not guaranteed, right? e.g. if someone >> > > encrypts >> > > > > > their >> > > > > > > > data which tends to randomize it and then enables >> > compressesion, >> > > it >> > > > > > could >> > > > > > > > slightly get bigger? >> > > > > > > > >> > > > > > > > I also wonder if the rejected alternatives you describe >> > couldn't >> > > be >> > > > > > made >> > > > > > > to >> > > > > > > > work: basically try to be a bit better at estimation and >> > recover >> > > > when >> > > > > > we >> > > > > > > > guess wrong. I don't think the memory usage should be a >> > problem: >> > > > > isn't >> > > > > > it >> > > > > > > > the same memory usage the consumer of that topic would need? >> > And >> > > > > can't >> > > > > > > you >> > > > > > > > do the splitting and recompression in a streaming fashion? >> If >> > we >> > > an >> > > > > > make >> > > > > > > > the estimation rate low and the recovery cost is just ~2x >> the >> > > > normal >> > > > > > cost >> > > > > > > > for that batch that should be totally fine, right? (It's >> > > > technically >> > > > > > true >> > > > > > > > you might have to split more than once, but since you halve >> it >> > > each >> > > > > > time >> > > > > > > I >> > > > > > > > think should you get a number of halvings that is >> logarithmic >> > in >> > > > the >> > > > > > miss >> > > > > > > > size, which, with better estimation you'd hope would be >> super >> > > duper >> > > > > > > small). >> > > > > > > > >> > > > > > > > Alternatively maybe we could work on the other side of the >> > > problem >> > > > > and >> > > > > > > try >> > > > > > > > to make it so that a small miss on message size isn't a big >> > > > problem. >> > > > > I >> > > > > > > > think original issue was that max size and fetch size were >> > > tightly >> > > > > > > coupled >> > > > > > > > and the way memory in the consumer worked you really wanted >> > fetch >> > > > > size >> > > > > > to >> > > > > > > > be as small as possible because you'd use that much memory >> per >> > > > > fetched >> > > > > > > > partition and the consumer would get stuck if its fetch size >> > > wasn't >> > > > > big >> > > > > > > > enough. I think we made some progress on that issue and >> maybe >> > > more >> > > > > > could >> > > > > > > be >> > > > > > > > done there so that a small bit of fuzziness around the size >> > would >> > > > not >> > > > > > be >> > > > > > > an >> > > > > > > > issue? >> > > > > > > > >> > > > > > > > -Jay >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin < >> > > becket....@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi folks, >> > > > > > > > > >> > > > > > > > > I would like to start the discussion thread on KIP-126. >> The >> > KIP >> > > > > > propose >> > > > > > > > > adding a new configuration to KafkaProducer to allow >> batching >> > > > based >> > > > > > on >> > > > > > > > > uncompressed message size. >> > > > > > > > > >> > > > > > > > > Comments are welcome. >> > > > > > > > > >> > > > > > > > > The KIP wiki is following: >> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > > > > > > 126+-+Allow+KafkaProducer+to+b >> atch+based+on+uncompressed+siz >> > e >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > >> > > > > > > > > Jiangjie (Becket) Qin >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > -Regards, >> > > > Mayuresh R. Gharat >> > > > (862) 250-7125 >> > > > >> > > >> > >> > >> > >> > -- >> > -Regards, >> > Mayuresh R. Gharat >> > (862) 250-7125 >> > >> > >