Hi, Issue: The MESSAGE_TOO_LARGE warning occurred when I run producer-perf-test.sh and the broker is arm server. Below it’s the command I used: bin/kafka-topics.sh --create --bootstrap-server arm-server:9092 --replication-factor 1 --partitions 6 --topic test bin/kafka-producer-perf-test.sh --topic test --num-records 50000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers= arm-server:9092 acks=1 buffer.memory=67108864 batch.size=65536 compression.type=zstd
[2020-03-03 06:11:02,485] WARN [Producer clientId=producer-1] Got error produce response in correlation id 236 on topic-partition test-2, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender) 514288 records sent, 7931.4 records/sec (7.56 MB/sec), 32267.0 ms avg latency, 60002.0 ms max latency. [2020-03-03 06:16:15,294] ERROR Uncaught exception in thread 'kafka-producer-network-thread | producer-1': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space: failed reallocation of scalar replaced objects After MESSAGE_TOO_LARGE occurred, the memory used in client increase dramatically, and the performance got worse. Below is the dstat data in client. dstat -cdmn --total-cpu-usage-- -dsk/total- ------memory-usage----- -net/total- usr sys idl wai stl| read writ| used free buff cach| recv send 0 0 99 0 0|1762k 11k|1180M 29.9G 13.3M 162M| 0 0 5 3 92 0 0| 0 0 |1180M 29.9G 13.3M 162M|1966B 3924B 11 2 87 0 0| 0 16k|1238M 29.9G 13.3M 162M|5066B 379k 9 3 87 0 0| 0 0 |1277M 29.8G 13.3M 162M|3580B 281k 5 2 93 0 0| 0 0 |1278M 29.8G 13.3M 162M|3712B 253k 16 2 82 0 0| 0 0 |1334M 29.8G 13.3M 162M| 12k 1568k 21 4 76 0 0| 0 20k|2047M 29.1G 13.3M 162M| 11k 51k 9 4 87 0 0| 0 0 |3199M 27.9G 13.3M 162M| 26k 97k 7 5 88 0 0| 0 0 |4341M 26.8G 13.3M 162M| 25k 94k 5 5 90 0 0| 0 0 |5543M 25.7G 13.3M 162M| 25k 100k 5 5 90 0 0| 0 0 |6903M 24.3G 13.3M 162M| 29k 120k 12 6 82 0 0| 0 0 |8320M 22.9G 13.3M 162M| 29k 116k 6 5 89 0 0| 0 28k|9568M 21.7G 13.3M 162M| 30k 124k 7 4 89 0 0| 0 0 |10.2G 20.8G 13.3M 162M| 35k 389k 11 5 83 0 0| 0 0 |10.3G 20.7G 13.3M 162M| 50k 768k 6 2 92 0 0| 0 0 |10.3G 20.7G 13.3M 162M| 20k 530k 8 3 89 0 0| 0 0 |10.4G 20.7G 13.3M 162M|9183B 471k 8 2 90 0 0| 0 96k|10.4G 20.7G 13.3M 162M|5552B 156k 6 2 92 0 0| 0 0 |10.4G 20.7G 13.3M 162M|9672B 486k 13 3 84 0 0| 0 16k|10.5G 20.6G 13.3M 162M|5771B 310k 10 3 87 0 0| 0 0 |10.5G 20.6G 13.3M 162M|7868B 295k 14 2 85 0 0| 0 0 |10.5G 20.5G 13.3M 162M| 16k 1524k 20 3 76 0 0| 0 32k|11.2G 19.9G 13.3M 162M| 13k 54k 12 5 84 0 0| 0 0 |12.4G 18.7G 13.3M 162M| 27k 113k 7 5 88 0 0| 0 0 |13.7G 17.4G 13.3M 162M| 29k 115k 11 5 84 0 0| 0 0 |14.9G 16.1G 13.3M 162M| 30k 124k 6 5 89 0 0| 0 0 |16.3G 14.8G 13.3M 162M| 28k 118k 6 4 89 1 0| 0 60k|17.5G 13.6G 13.3M 162M| 27k 107k 7 6 87 0 0| 0 0 |18.7G 12.4G 13.3M 162M| 29k 164k 5 1 93 0 0| 0 0 |18.7G 12.4G 13.3M 162M| 45k 713k 10 3 87 0 0| 0 0 |18.8G 12.3G 13.3M 162M| 24k 548k 6 2 92 0 0| 0 0 |18.8G 12.3G 13.3M 162M| 12k 422k 7 2 90 0 0| 0 44k|18.8G 12.3G 13.3M 162M| 13k 470k 16 2 82 0 0| 0 0 |18.8G 12.3G 13.3M 162M|9324B 323k 6 2 91 0 0| 0 0 |18.8G 12.3G 13.3M 162M| 11k 386k 8 2 90 0 0| 0 0 |18.8G 12.3G 13.3M 162M| 12k 403k 8 3 89 0 0| 0 0 |18.8G 12.3G 13.3M 162M| 12k 370k 7 2 90 0 0| 0 0 |18.8G 12.3G 13.3M 162M| 13k 435k 7 2 91 0 0| 0 20k|18.8G 12.2G 13.3M 162M| 17k 397k 8 2 90 0 0| 0 0 |18.8G 12.2G 13.3M 162M| 16k 546k 10 2 88 0 0| 0 0 |18.9G 12.2G 13.3M 162M| 13k 436k 5 2 93 0 0| 0 0 |18.9G 12.2G 13.3M 162M| 14k 492k 11 3 87 0 0| 0 0 |18.9G 12.2G 13.3M 162M| 13k 465k 8 2 90 0 0| 0 0 |18.9G 12.2G 13.3M 162M|9836B 384k 10 1 88 0 0| 0 20k|18.9G 12.2G 13.3M 162M|9884B 392k 10 1 89 0 0| 0 0 |18.9G 12.1G 13.3M 162M| 69k 771k 4 2 94 0 0| 0 0 |18.9G 12.1G 13.3M 162M| 71k 668k 4 1 95 0 0| 0 0 |19.0G 12.1G 13.3M 162M| 68k 634k 2 1 98 0 0| 0 16k|19.0G 12.1G 13.3M 162M| 72k 725k 6 2 92 0 0| 0 0 |19.0G 12.1G 13.3M 162M| 73k 690k 3 1 95 0 0| 232k 0 |19.0G 12.0G 13.5M 162M| 73k 723k 5 2 94 0 0| 0 0 |19.0G 12.0G 13.5M 162M| 79k 764k 2 1 96 0 0|4096B 56k|19.1G 12.0G 13.5M 162M| 81k 800k 5 2 94 0 0| 0 72k|19.1G 12.0G 13.5M 162M| 69k 663k 1 1 98 0 0| 0 0 |19.1G 12.0G 13.5M 162M| 48k 476k This issue doesn’t when compression type is none, snappy,lz4 or gzip. Environment: Server: one arm server Client: x86_64(E5-2650 v3) Kafka: Git clone https://github.com/apache/kafka.git and build it by “./gradlew jar” OS: Ubuntu 18.04.4 LTS Java: openjdk version "1.8.0_242" Scala: The default version, not installed manually. Set the below environment variables on both server and client export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+PreserveFramePointer" server.properties socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 >From source code aspect, what I have done is below: 1. In kafka/core/src/main/scala/kafka/log/Log.scala , it will check if the batchsize is valid, and if not, it will throw RecordTooLargeException. And I add log check the batchsize. It’s bigger than config.maxMessageSize(1048588) if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { if (batch.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" + s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.") } } 2. In clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java, It will check if receiveSize is valid in readFrom method. And I add some log to print the receive size, requestedBufferSize and read. public long readFrom(ScatteringByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; if (!size.hasRemaining()) { size.rewind(); int receiveSize = size.getInt(); log.warn("####receiveSize is {} , maxSize is {} ",receiveSize, maxSize); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL) if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } } if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null) log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source); } if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } log.warn("####requestedBufferSize is {}, read is {} " ,requestedBufferSize, read ); return read;} where maxSize is socket.request.max.bytes and its value is 104857600. There are receiveSizes which is bigger than config.maxMessageSize. And I made a receiveSize contrast between arm(server is arm, and client is x86) and x86(server and client are x86). The difference betteen arm and x86 is big. arm:2158872, X86:87158. The read is much lower than requestedBufferSize. The biggest read is 1084552 The biggest requestBufferSize is 3927692. Why the requestBufferSize is much biger than read ? 3. In clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java, I added log to print buffer size. private static ByteBuffer sizeBuffer(int size) { ByteBuffer sizeBuffer = ByteBuffer.allocate(4); ByteBuffer test = ByteBuffer.allocate(4); sizeBuffer.putInt(size); log.warn("#### size ={}”, size); sizeBuffer.rewind(); return sizeBuffer; } And the biggest size is the same as the biggest requestBufferSize, which is also 3927692. This really confuses me. ARM and X86 use the same client to produce data to broker. Why the size(arm ) in NetworkSend.java ( I suppose this is the data sent to broker ) is biger than size(x86)? 4. Some of the producer process flow is RecordAccumulator.drain()---RecordAccumulator.drainBatchesForOneNode----ProduceBatch.close----(recordsBuilder.close , which get the actualCompressionRatio ), (CompressionRatioEstimator.updateEstimation) I added some logs to print estimatedCompressionRatio ,ready, and uncompressedRecordsSizeInBytes In clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java private int estimatedBytesWritten() { if (compressionType == CompressionType.NONE) { return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes; } else { // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes log.warn("####uncompressedRecordsSizeInBytes is {} ,estimatedCompressionRatio is {} ",uncompressedRecordsSizeInBytes, estimatedCompressionRatio); return batchHeaderSizeInBytes + (int) (uncompressedRecordsSizeInBytes * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR); } } In clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); for (Node node : nodes) { List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); batches.put(node.id(), ready); //maxSize is 1048576 log.warn("#maxSize is {} , ready is {} ", maxSize, ready); } return batches; } The lowest estimatedCompressionRatio is -8.419007E-4 which is negative number. And I checked it on x86. There were also estimatedCompressionRation like this. >From ready ,we could get the record count for some topicPartition, the biggest >is 394277 The biggest uncompressedRecordsSizeInBytes is 711893825. What might cause uncompressedRecordsSizeInBytes so big? Why is estimatedCompressionRation a negative number? Is that normal? 5. In clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, add log to check if this if statement was enter. if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { log.warn(“#### size is {} , first.estimatedSizeInBytes() is {} ", size, first.estimatedSizeInBytes() ); // there is a rare case that a single batch size is larger than the request size due to // compression; in this case we will still eventually send this batch in a single request break; } There is no log like this, which means this it didn’t enter this if statement . Is the way producer compress data dependent on broker ? Like compression ratio. Why does the client used memory vary dramatically when after MESSAGE_TOO_LARGE occur? Could anyone give me some suggestions on how to locate the root cause to this issue? Thanks. Best Wishes, Jiamei IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.