Ah, gotcha, yeah, I couldn't figure that out. I see the the trouble now--if you are currently blocked that doesn't contribute to the metic. That makes sense.
-Jay On Sun, Oct 5, 2014 at 7:49 PM, Jun Rao <jun...@gmail.com> wrote: > Sorry, that's my bad. The intention was that in case the producer blocks > there forever (because memory never frees up for some reason), having a > timeout will allow us to get a more accurate stat on the waitTime sensor. > Filed and patched kafka-1673. Could you take a look? > > Thanks, > > Jun > > On Sun, Oct 5, 2014 at 3:35 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Maybe this was introduced in > > > > dcc88408c98a07cb9a816ab55cd81e55f1d2217d > > > > as part of KAFKA-1488. > > > > We seem to add a hard coded 300 ms timeout on the wait time. After this > 300 > > ms timeout the thread is allowed to allocate memory to itself > irrespective > > of whether it is next in line. This is not right at all and will cause > all > > kinds of bad things to happen including deadlocks and that error. Joel, > > Jun, Guozhang, any idea what the idea was here? This code is fairly > nuanced > > and we have to be pretty thoughtful in changing it. I actually don't know > > what the intention here was or why we would add this...? > > > > -Jay > > > > On Sun, Oct 5, 2014 at 2:59 PM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com > > > > > wrote: > > > > > Hi Kafka Dev team, > > > > > > I am getting following exception occasionally, and data is lost due to > > this > > > exception. What could cause this error ? We have prod release > coming > > up > > > with New Kafka API end of this week. So I any help is greatly > > appreciated. > > > > > > * java.lang.IllegalStateException: Wrong condition: this shouldn't > > happen.* > > > at > > > > > > > > > org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:156) > > > at > > > > > > > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:152) > > > at > > > > > > > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:237) > > > ................... > > > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > at java.lang.Thread.run(Thread.java:744) > > > > > > Here is Producer Config: > > > > > > bootstrap.servers=[List ] > > > acks=1 > > > buffer.memory=3145728 > > > compression.type=snappy > > > retries=3 > > > batch.size=49152 > > > max.request.size=2097152 > > > send.buffer.bytes=2097152 > > > block.on.buffer.full=true > > > send.buffer.bytes=2097152 > > > linger.ms=2000 > > > > > > Thanks, > > > > > > Bhavesh > > > > > >