I've not explained why only 10 partitions, anyway this is due to the fact that this does not speedup producer and also having this memory-monitoring problem and because I have no problems on the consumers side at the moment (10 should be enough even if I've not fully tested it yet ) and because the solution would be this partitions-separated-queues 10 seemed a fair number for me... also I'm considering the setup of a kind of producer-proxy instances, something that can be pushed with a batch of messages from a producer that I see that have communication problems to a leader and send this batch for him, this could help to mitigate the producer latency during such networking problems.. there isn't something like this yet right? an option in the producer that try to use a non-leader broker as broxy for the partition-leader broker would be optimal :) maybe with producer-to-singlebroker latency stats :)
Thanks again and sorry for the verbosity :) On Tue, Jul 14, 2015 at 10:14 PM, francesco vigotti < vigotti.france...@gmail.com> wrote: > Hi, > I'm playing with kafka new producer to see if it could fit my use case, > kafka version 8.2.1 > I'll probably end up having a kafka cluster of 5 nodes on multiple > datacenter > with one topic, with a replication factor of 2, and at least 10 partitions > required for consumer performance , ( I'll explain why only 10 later.. ) > my producers are 10 or more also distributed on multiple datacenters > around the globe each producing about 2k messages/sec > message size is about 500 bytes*message > > during my tests I have seen that when there is a network issue between one > of the producer and one of the partition leaders, the producer start to > accumulate data much more than the configured buffer.memory (configured at > about 40mb, which crash the jvm because Garbage collector cannot make space > to the application to work ( jvm memory is about 2 gb, working at about 1gb > usage always except when this problem occurr) > what i see is that i have about 500mb used by char[], > > during this events, yes, the buffer usage increase from about 5 mb to > about 30mb, and batches sends reach their maximum size to help to speedup > the transmission but seems to be not enought for memory usage / performance > / resilience requirements, > > > > kafka.producer.acks=1 > kafka.producer.buffer.memory=40000000 > kafka.producer.compression.type=gzip > kafka.producer.retries=1 > kafka.producer.batch.size=10000 > kafka.producer.max.request.size=10000000 > kafka.producer.send.buffer.bytes=1000000 > kafka.producer.timeout.ms=10000 > kafka.producer.reconnect.backoff.ms=500 > kafka.producer.retry.backoff.ms=500 > kafka.producer.block.on.buffer.full=false > kafka.producer.linger.ms=1000 > kafka.producer.maxinflight=2 > kafka.producer.serializer.class=kafka.serializer.DefaultEncoder > > the main issue anyway is that when 1 producer get connections path issues > to 1 broker (packet loss/connection slowdown) ( which doesn't mean that the > broker is down , other producer could reach it , it's just a networking > problem between the two which coul last just 1 hours during peak time in > some routing paths) it crash the whole application due to jvm ram usage > > I have found no solution playing with available params, > > now reading around I have understood that description of internal produer > work: > producer get messages and return an async future ( but during first > connection it blocks before returing the future due to metadata fetching , > but that's another strange story :) ) , then the message get queued in 1 > queue per partition, then in a random order, but sequentially brokers are > contacted and all queue for the partitions assigned to that brokers are > sent, waiting the end of transmission before proceeding to the next broker > but if the transmission to 1 broker hangs /slow down it does everything ( > other queues grows ) and I don't understood where the buffer memory will be > used in this whole process, because to me seems that it there is something > else that is using memory when this happens > > *to get more finer grained control over memory usage of kafka producer :* > a simple file-backed solutions that monitor the callbacks to monitor when > the producer hangs and stop sending to kafka producer more data when I know > that there are more than N messages around could be a partial solution, ie > could solve the memory usage but it's strange because for that the buffer > should be enought... is the buffer internally multiplied by partitions or > brokers ? or there is something else I'm not considering that could use so > much ram (10x the buffer size) ? > > but also this solution will slow down the whole producer in transmissions > to all partitions leaders, > > where N is the number of the partitions in the queue, > I'm going to build something like N threaded solution where each thread > handle one queue for one partition ( handing 1 leader per thread would be > optimal but to avoid the need to handling of leader assignment/reelection 1 > queue per partition is easier ) each thread have one producer which > obviously will have less batch-performance improvements because > multiple-partition-batches doesn't get aggregated to the assigned leader > but at least will have more batching than the sync producer, and if the > callback-tracking count of in-transmission messages reach some threshold I > start using the local-disk-storage as persistence for message to not give > the producer too much messages, and using the disk-fifo as sources for the > producer til it's size reach 0 again... at the end each thread will have > his file backed queue when slow down , and will process everything on ram > queues when run OK, but each producer will never be overloaded of messages , > having a more finer grained control on producer memory usage could avoid > the callback-inflying-message-counter but the disk queue is anyway required > to avoid message discards on buffer full, and multiple producer are > required to avoid slow down of 1 producer when the problem is only between > that producer and one partition leader... > > does all this make sense ? > > Thank you :), > Francesco Vigotti >