here are my proposed changes:
https://github.com/radai-rosenblatt/kafka/commit/8d7744ab8a6c660c4749b495b033b948a68efd3c
at this point i've run this code on a test cluster under load that OOMs
"vanilla" 0.10.1.0 and verified that my code deployed under the same
condition remains stable.
what i've
Hi Jun,
the benchmarks just spawn 16 threads where each thread allocates a chunk of
memory from the pool and immediately releases it. 16 was chosen because its
typical for LinkedIn setups. the benchmarks never "consume" more than 16 *
[single allocation size] and so do not test out-of-memory perfo
Hi, Radai,
Sorry for the late response. How should the benchmark results be
interpreted? The higher the ops/s, the better? It would also be useful to
test this out on LinkedIn's traffic with enough socket connections to see
if there is any performance degradation.
Also, there is a separate propos
Hi Jun,
10 - mute/unmute functionality has been added in
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting.
I have yet to run stress tests to see how it behaves versus without muting
11 - I've added a SimplePool implementation (nothing more than an
AtomicLong really) a
Hi, Radi,
For 10, yes, we don't want the buffer pool to wake up the selector every
time some memory is freed up. We only want to do that when there is pending
requests to the buffer pool not honored due to not enough memory.
For 11, we probably want to be a bit careful with Weak References. In
ht
Hi Jun,
10 - I'll add this functionality to the mute/unmute branch. as every
mute/unmute operation is O(#connections / #selectorThreads) maybe a
watermark approach is better than waking when _any_ mem is available?
11 - "gc notifications" are done by using a ReferenceQueue (
https://docs.oracle.c
Hi, Radai,
Thanks for the updated KIP. A few more questions/comments below.
10. For "the mute/unmute happens just before poll(), which means as a worst
case there will be no reads for 300ms if memory was unavailable", I am
thinking that memory-pool could track if there is any pending request and
As discussed in the KIP call, I have updated the kip-72 page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
to record both configuration validations and implementation concerns.
I've also implemented channel muting/unmuti
the specific memory pool implementation i wrote will allocate _any_ amount
you request if it has _any_ memory available (so if it has 1 byte available
and you ask for 1MB you will get 1MB and the counter will go negative).
this was done to avoid issues with starvation of large requests. other
imple
Is there any value in allowing the 4-byte size to be read even when the
request memory limit has been reached? If not, you can disable OP_READ
interest for all channels that are ready inside Selector.poll() when memory
limit has been reached and re-enable before returning from poll(). Perhaps
a lis
Hi Jun,
Yes, youre right - right now the next select() call will return immediately
with the same set of keys as earlier (at least) as they were not previously
handled (no memory).
My assumption is that this happens under considerable load - something has
to be occupying all this memory. also, thi
Hi, Radai,
Thanks for the reply. I still have a followup question on #2.
My understanding is that in your proposal, selector will now first read the
size of the Receive. If there is not enough memory, it has to turn off the
READ interest bit for the corresponding KafkaChannel. Otherwise, subseque
Hi Jun,
1. yes, it is my own personal opinion that people use queued.max.requests
as an indirect way to bound memory consumption. once a more direct memory
bound mechanism exists (and works) i dont think queued.max.requests woul
dbe required. having said that I was not planning on making any chang
Hi, Radi,
Thanks for the update. At the high level, this looks promising. A few
comments below.
1. If we can bound the requests by bytes, it seems that we don't need
queued.max.requests
any more? Could we just deprecate the config and make the queue size
unbounded?
2. How do we communicate back t
My apologies for the delay in response.
I agree with the concerns about OOM reading from the actual sockets and
blocking the network threads - messing with the request queue itself would
not do.
I propose instead a memory pool approach - the broker would have a non
blocking memory pool. upon read
Radi,
Yes, I got the benefit of bounding the request queue by bytes. My concern
is the following if we don't change the behavior of processor blocking on
queue full.
If the broker truly doesn't have enough memory for buffering outstanding
requests from all connections, we have to either hit OOM o
I like the simplicity of the approach and can see that it is an improvement
over the current implementation in typical scenarios. But I would like to
see Jun's proposal to mute sockets explored further. With the proposal in
the KIP to limit queue size, I am not sure how to calculate the total
memor
Thought about this again. If I understand correctly Jun's concern is about
the cascading effect. Currently the processor will try to put all the
requests received in one poll() call into the RequestChannel. This could
potentially be long if the queue is moving really really slowly. If we
don't mute
I agree that filling up the request queue can cause clients to time out
(and presumably retry?). However, for the workloads where we expect this
configuration to be useful the alternative is currently an OOM crash.
In my opinion an initial implementation of this feature could be
constrained to a si
Nice write up Radai.
I think what Jun said is a valid concern.
If I am not wrong as per the proposal, we are depending on the entire
pipeline to flow smoothly from accepting requests to handling it, calling
KafkaApis and handing back the responses.
Thanks,
Mayuresh
On Mon, Aug 8, 2016 at 12:22
>
> .
>>
>>
> Hi Becket,
>
> I don't think progress can be made in the processor's run loop if the
> queue fills up. i.e., I think Jun's point is that if the queue is full
> (either due to the proposed max.bytes or today due to max.requests hitting
> the limit) then processCompletedReceives will bl
>
>
>
> 2. Good point about the consequence when the processor threads are
> blocking. I agree it would be important to keep the processor thread
> running, but I am not sure if it would be a problem of the current
> proposal. In most of the time, the request queue should be close to empty,
> so th
Hi Jun,
1. The requests in the queue are RequestChannel.Request. It contains the
raw ByteBuffer from the socket, we can probably just use that. This might
not be 100% accurate about the memory taken, but probably is good enough.
2. Good point about the consequence when the processor threads are
b
Radai,
Thanks for the proposal. A couple of comments on this.
1. Since we store request objects in the request queue, how do we get an
accurate size estimate for those requests?
2. Currently, it's bad if the processor blocks on adding a request to the
request queue. Once blocked, the processor c
Supporting both configs at the same time to begin with would allow enough
time for users to experiment with the settings. If in the future its deemed
that a memory bound is far superior to a #req bound (which is my opinion
but needs evidence to support) the migration discussion could be deferred
to
Thanks for the proposal, Radai. +1
On Fri, Aug 5, 2016 at 8:38 AM, Ismael Juma wrote:
> Hi Radai,
>
> Thanks for the proposal. I think it makes sense to be able to limit the
> request queue by bytes. I haven't made up my mind on whether having both
> limits is better than having a single one yet
Hi Radai,
Thanks for the proposal. I think it makes sense to be able to limit the
request queue by bytes. I haven't made up my mind on whether having both
limits is better than having a single one yet, but we probably need to make
a call on that before we can start a vote.
Just a quick point with
The proposal makes sense to me. I like that the plan to support both limits
simultaneously:
queued.max.requests is supported in addition queued.max.bytes (both
> respected at the same time). In this case a default value of
> queued.max.bytes = -1 would maintain backwards compatible behavior.
Esp
This makes good sense to me, and seems to have very low amounts of downside
with large amounts of upside. +1
On Thursday, 4 August 2016, radai wrote:
> Hello,
>
> I'd like to initiate a discussion about
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+Sizing+Incoming+Reques
Hello,
I'd like to initiate a discussion about
https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes
The goal of the KIP is to allow configuring a bound on the capacity (as in
bytes of memory used) of the incoming request queue, in addition to th
30 matches
Mail list logo