Hi,
I'm playing with kafka new producer to see if it could fit my use case,
kafka version 8.2.1
I'll probably end up having a kafka cluster of 5 nodes on multiple
datacenter
with one topic, with a replication factor of 2, and at least 10 partitions
required for consumer performance , ( I'll explain why only 10 later.. )
my producers are 10 or more also distributed on multiple datacenters around
the globe each producing about 2k messages/sec
message size is about 500 bytes*message

during my tests I have seen that when there is a network issue between one
of the producer and one of the partition leaders, the producer start to
accumulate data much more than the configured buffer.memory (configured at
about 40mb, which crash the jvm because Garbage collector cannot make space
to the application to work ( jvm memory is about 2 gb, working at about 1gb
usage always except when this problem occurr)
what i see is that i have about 500mb used by char[],

during this events, yes, the buffer usage increase from about 5 mb to about
30mb, and batches sends reach their maximum size to help to speedup the
transmission but seems to be not enought for memory usage / performance /
resilience requirements,



kafka.producer.acks=1
kafka.producer.buffer.memory=40000000
kafka.producer.compression.type=gzip
kafka.producer.retries=1
kafka.producer.batch.size=10000
kafka.producer.max.request.size=10000000
kafka.producer.send.buffer.bytes=1000000
kafka.producer.timeout.ms=10000
kafka.producer.reconnect.backoff.ms=500
kafka.producer.retry.backoff.ms=500
kafka.producer.block.on.buffer.full=false
kafka.producer.linger.ms=1000
kafka.producer.maxinflight=2
kafka.producer.serializer.class=kafka.serializer.DefaultEncoder

the main issue anyway is that when 1 producer get connections path issues
to 1 broker (packet loss/connection slowdown) ( which doesn't mean that the
broker is down , other producer could reach it , it's just a networking
problem between the two which coul last just 1 hours during peak time in
some routing paths) it crash the whole application due to jvm ram usage

I have found no solution playing with available params,

now reading around I have understood that description of internal produer
work:
producer get messages and return an async future ( but during first
connection it blocks before returing the future due to metadata fetching ,
but that's another strange story :) ) , then the message get queued in 1
queue per partition, then in a random order, but sequentially brokers are
contacted and all queue for the partitions assigned to that brokers are
sent, waiting the end of transmission before proceeding to the next broker
 but if the transmission to 1 broker hangs /slow down it does everything (
other queues grows ) and I don't understood where the buffer memory will be
used in this whole process, because to me seems that it there is something
else that is using memory when this happens

*to get more finer grained control over memory usage of kafka producer :*
a simple file-backed solutions that monitor the callbacks to monitor when
the producer hangs and stop sending to kafka producer more data when I know
that there are more than N messages around could be a partial solution, ie
could solve the memory usage but it's strange because for that the buffer
should be enought... is the buffer internally multiplied by partitions or
brokers ? or there is something else I'm not considering that could use so
much ram (10x the buffer size) ?

but also this solution will slow down the whole producer in transmissions
to all partitions leaders,

where N is the number of the partitions in  the queue,
I'm going to build something like N threaded solution where each thread
handle one queue for one partition ( handing 1 leader per thread would be
optimal but to avoid the need to handling of leader assignment/reelection 1
queue per partition is easier ) each thread have one producer which
obviously will have less batch-performance improvements because
multiple-partition-batches doesn't get aggregated to the assigned leader
but at least will have more batching than the sync producer, and if the
callback-tracking count of in-transmission messages reach some threshold I
start using the local-disk-storage as persistence for message to not give
the producer too much messages, and using the disk-fifo as sources for the
producer til it's size reach 0 again... at the end each thread will have
his file backed queue when slow down , and will process everything on ram
queues when run OK, but each producer will never be overloaded of messages ,
having a more finer grained control on producer memory usage could avoid
the callback-inflying-message-counter but the disk queue is anyway required
to avoid message discards on buffer full, and multiple producer are
required to avoid slow down of 1 producer when the problem is only between
that producer and one partition leader...

does all this make sense ?

Thank you :),
Francesco Vigotti

Reply via email to