Hi,
Issue:
The MESSAGE_TOO_LARGE  warning occurred when I run producer-perf-test.sh and 
the broker is arm server.  Below it’s the command I used:
bin/kafka-topics.sh --create --bootstrap-server arm-server:9092 
--replication-factor 1 --partitions 6 --topic test
bin/kafka-producer-perf-test.sh --topic test --num-records 50000000 
--throughput -1 --record-size 1000 --producer-props bootstrap.servers= 
arm-server:9092 acks=1 buffer.memory=67108864 batch.size=65536 
compression.type=zstd

[2020-03-03 06:11:02,485] WARN [Producer clientId=producer-1] Got error produce 
response in correlation id 236 on topic-partition test-2, splitting and 
retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE 
(org.apache.kafka.clients.producer.internals.Sender)
514288 records sent, 7931.4 records/sec (7.56 MB/sec), 32267.0 ms avg latency, 
60002.0 ms max latency.
[2020-03-03 06:16:15,294] ERROR Uncaught exception in thread 
'kafka-producer-network-thread | producer-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space: failed reallocation of scalar 
replaced objects
After MESSAGE_TOO_LARGE occurred, the memory used in client increase 
dramatically, and the performance got worse.  Below is the dstat data in client.
dstat -cdmn
--total-cpu-usage-- -dsk/total- ------memory-usage----- -net/total-
usr sys idl wai stl| read  writ| used  free  buff  cach| recv  send
  0   0  99   0   0|1762k   11k|1180M 29.9G 13.3M  162M|   0     0
  5   3  92   0   0|   0     0 |1180M 29.9G 13.3M  162M|1966B 3924B
11   2  87   0   0|   0    16k|1238M 29.9G 13.3M  162M|5066B  379k
  9   3  87   0   0|   0     0 |1277M 29.8G 13.3M  162M|3580B  281k
  5   2  93   0   0|   0     0 |1278M 29.8G 13.3M  162M|3712B  253k
16   2  82   0   0|   0     0 |1334M 29.8G 13.3M  162M|  12k 1568k
21   4  76   0   0|   0    20k|2047M 29.1G 13.3M  162M|  11k   51k
  9   4  87   0   0|   0     0 |3199M 27.9G 13.3M  162M|  26k   97k
  7   5  88   0   0|   0     0 |4341M 26.8G 13.3M  162M|  25k   94k
  5   5  90   0   0|   0     0 |5543M 25.7G 13.3M  162M|  25k  100k
  5   5  90   0   0|   0     0 |6903M 24.3G 13.3M  162M|  29k  120k
12   6  82   0   0|   0     0 |8320M 22.9G 13.3M  162M|  29k  116k
  6   5  89   0   0|   0    28k|9568M 21.7G 13.3M  162M|  30k  124k
  7   4  89   0   0|   0     0 |10.2G 20.8G 13.3M  162M|  35k  389k
11   5  83   0   0|   0     0 |10.3G 20.7G 13.3M  162M|  50k  768k
  6   2  92   0   0|   0     0 |10.3G 20.7G 13.3M  162M|  20k  530k
  8   3  89   0   0|   0     0 |10.4G 20.7G 13.3M  162M|9183B  471k
  8   2  90   0   0|   0    96k|10.4G 20.7G 13.3M  162M|5552B  156k
  6   2  92   0   0|   0     0 |10.4G 20.7G 13.3M  162M|9672B  486k
13   3  84   0   0|   0    16k|10.5G 20.6G 13.3M  162M|5771B  310k
10   3  87   0   0|   0     0 |10.5G 20.6G 13.3M  162M|7868B  295k
14   2  85   0   0|   0     0 |10.5G 20.5G 13.3M  162M|  16k 1524k
20   3  76   0   0|   0    32k|11.2G 19.9G 13.3M  162M|  13k   54k
12   5  84   0   0|   0     0 |12.4G 18.7G 13.3M  162M|  27k  113k
  7   5  88   0   0|   0     0 |13.7G 17.4G 13.3M  162M|  29k  115k
11   5  84   0   0|   0     0 |14.9G 16.1G 13.3M  162M|  30k  124k
  6   5  89   0   0|   0     0 |16.3G 14.8G 13.3M  162M|  28k  118k
  6   4  89   1   0|   0    60k|17.5G 13.6G 13.3M  162M|  27k  107k
  7   6  87   0   0|   0     0 |18.7G 12.4G 13.3M  162M|  29k  164k
  5   1  93   0   0|   0     0 |18.7G 12.4G 13.3M  162M|  45k  713k
10   3  87   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  24k  548k
  6   2  92   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  12k  422k
  7   2  90   0   0|   0    44k|18.8G 12.3G 13.3M  162M|  13k  470k
16   2  82   0   0|   0     0 |18.8G 12.3G 13.3M  162M|9324B  323k
  6   2  91   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  11k  386k
  8   2  90   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  12k  403k
  8   3  89   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  12k  370k
  7   2  90   0   0|   0     0 |18.8G 12.3G 13.3M  162M|  13k  435k
  7   2  91   0   0|   0    20k|18.8G 12.2G 13.3M  162M|  17k  397k
  8   2  90   0   0|   0     0 |18.8G 12.2G 13.3M  162M|  16k  546k
10   2  88   0   0|   0     0 |18.9G 12.2G 13.3M  162M|  13k  436k
  5   2  93   0   0|   0     0 |18.9G 12.2G 13.3M  162M|  14k  492k
11   3  87   0   0|   0     0 |18.9G 12.2G 13.3M  162M|  13k  465k
  8   2  90   0   0|   0     0 |18.9G 12.2G 13.3M  162M|9836B  384k
10   1  88   0   0|   0    20k|18.9G 12.2G 13.3M  162M|9884B  392k
10   1  89   0   0|   0     0 |18.9G 12.1G 13.3M  162M|  69k  771k
  4   2  94   0   0|   0     0 |18.9G 12.1G 13.3M  162M|  71k  668k
  4   1  95   0   0|   0     0 |19.0G 12.1G 13.3M  162M|  68k  634k
  2   1  98   0   0|   0    16k|19.0G 12.1G 13.3M  162M|  72k  725k
  6   2  92   0   0|   0     0 |19.0G 12.1G 13.3M  162M|  73k  690k
  3   1  95   0   0| 232k    0 |19.0G 12.0G 13.5M  162M|  73k  723k
  5   2  94   0   0|   0     0 |19.0G 12.0G 13.5M  162M|  79k  764k
  2   1  96   0   0|4096B   56k|19.1G 12.0G 13.5M  162M|  81k  800k
  5   2  94   0   0|   0    72k|19.1G 12.0G 13.5M  162M|  69k  663k
  1   1  98   0   0|   0     0 |19.1G 12.0G 13.5M  162M|  48k  476k


This issue doesn’t  when compression type is none, snappy,lz4 or gzip.

Environment:
Server: one arm server
Client: x86_64(E5-2650 v3)
Kafka: Git clone https://github.com/apache/kafka.git and build it by “./gradlew 
jar”
OS: Ubuntu 18.04.4 LTS
Java: openjdk version "1.8.0_242"
Scala: The default version, not installed manually.

Set the below environment variables on both server and client
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 
-XX:MaxMetaspaceFreeRatio=80 -XX:+PreserveFramePointer"

server.properties
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600


>From source code aspect, what I have done is below:
1.
In kafka/core/src/main/scala/kafka/log/Log.scala , it will check if the 
batchsize is valid, and if not, it will throw RecordTooLargeException. And I 
add log check the batchsize. It’s bigger than config.maxMessageSize(1048588)


          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the 
trimmed size
                // to be consistent with pre-compression bytesRejectedRate 
recording
                
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is 
${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum 
configured size of ${config.maxMessageSize}.")
              }
            }
2.  In 
clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java, It 
will check if receiveSize is valid in readFrom method. And I add some log to 
print the  receive size,  requestedBufferSize and read.

public long readFrom(ScatteringByteChannel channel) throws IOException {
    int read = 0;
    if (size.hasRemaining()) {
        int bytesRead = channel.read(size);
        if (bytesRead < 0)
            throw new EOFException();
        read += bytesRead;
        if (!size.hasRemaining()) {
            size.rewind();
            int receiveSize = size.getInt();
            log.warn("####receiveSize is {} , maxSize is {} ",receiveSize, 
maxSize);
            if (receiveSize < 0)
                throw new InvalidReceiveException("Invalid receive (size = " + 
receiveSize + ")");
            if (maxSize != UNLIMITED && receiveSize > maxSize)
                throw new InvalidReceiveException("Invalid receive (size = " + 
receiveSize + " larger than " + maxSize + ")");
            requestedBufferSize = receiveSize; //may be 0 for some payloads 
(SASL)
            if (receiveSize == 0) {
                buffer = EMPTY_BUFFER;
            }
        }
    }
    if (buffer == null && requestedBufferSize != -1) { //we know the size we 
want but havent been able to allocate it yet
        buffer = memoryPool.tryAllocate(requestedBufferSize);
        if (buffer == null)
            log.trace("Broker low on memory - could not allocate buffer of size 
{} for source {}", requestedBufferSize, source);
    }
    if (buffer != null) {
        int bytesRead = channel.read(buffer);
        if (bytesRead < 0)
            throw new EOFException();
        read += bytesRead;
    }
    log.warn("####requestedBufferSize is {}, read is {} " ,requestedBufferSize, 
read );
    return read;}

where maxSize is socket.request.max.bytes and its value is 104857600. There are 
receiveSizes which is bigger than config.maxMessageSize. And I made a 
receiveSize contrast between arm(server is arm, and client is x86) and 
x86(server and client are x86). The difference betteen arm and x86 is big.  
arm:2158872, X86:87158.

The read is much lower than requestedBufferSize.
The biggest read is  1084552
The biggest requestBufferSize is 3927692.
Why the requestBufferSize is much biger than read ?

3. In clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java, I 
added log to print buffer size.
private static ByteBuffer sizeBuffer(int size) {
    ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
    ByteBuffer test = ByteBuffer.allocate(4);
    sizeBuffer.putInt(size);
    log.warn("#### size ={}”, size);
    sizeBuffer.rewind();
    return sizeBuffer;
}

And the biggest size is the same as  the biggest requestBufferSize, which is 
also 3927692.  This really confuses me. ARM and X86 use the same client to 
produce data to broker. Why the size(arm ) in NetworkSend.java ( I suppose this 
is the data sent to broker )  is biger than size(x86)?


4. Some of the producer process flow is
RecordAccumulator.drain()---RecordAccumulator.drainBatchesForOneNode----ProduceBatch.close----(recordsBuilder.close
 , which get the actualCompressionRatio ), 
(CompressionRatioEstimator.updateEstimation)

I added some logs to print estimatedCompressionRatio ,ready,  and 
uncompressedRecordsSizeInBytes
In 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
   private int estimatedBytesWritten() {
        if (compressionType == CompressionType.NONE) {
            return batchHeaderSizeInBytes + uncompressedRecordsSizeInBytes;
        } else {
            // estimate the written bytes to the underlying byte buffer based 
on uncompressed written bytes
       log.warn("####uncompressedRecordsSizeInBytes is {} 
,estimatedCompressionRatio is {} ",uncompressedRecordsSizeInBytes, 
estimatedCompressionRatio);
           return batchHeaderSizeInBytes + (int) 
(uncompressedRecordsSizeInBytes * estimatedCompressionRatio * 
COMPRESSION_RATE_ESTIMATION_FACTOR);
        }
    }

In 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> 
nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();
        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, 
maxSize, now);
            batches.put(node.id(), ready);
//maxSize is 1048576
            log.warn("#maxSize is {} , ready is {} ", maxSize, ready);
        }
        return batches;
    }

The lowest estimatedCompressionRatio is -8.419007E-4 which is negative number. 
And I checked it on x86. There were also estimatedCompressionRation like this.
>From ready ,we could get the record count for some topicPartition, the biggest 
>is 394277
The biggest uncompressedRecordsSizeInBytes  is 711893825.

What might cause uncompressedRecordsSizeInBytes so big?
Why is estimatedCompressionRation a negative number? Is that normal?

5. In 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
  add log to check if this if statement was enter.
                if (size + first.estimatedSizeInBytes() > maxSize && 
!ready.isEmpty()) {
                    log.warn(“#### size is {} , first.estimatedSizeInBytes() is 
{} ", size, first.estimatedSizeInBytes() );
                    // there is a rare case that a single batch size is larger 
than the request size due to
                   // compression; in this case we will still eventually send 
this batch in a single request
                    break;
                }
There is no log like this, which means this it didn’t enter this if statement .

Is the way producer compress data dependent on broker ? Like compression ratio. 
 Why does the client used memory vary dramatically when  after 
MESSAGE_TOO_LARGE occur? Could anyone give me some suggestions on how to locate 
the root cause to this issue?  Thanks.

Best Wishes,
Jiamei

IMPORTANT NOTICE: The contents of this email and any attachments are 
confidential and may also be privileged. If you are not the intended recipient, 
please notify the sender immediately and do not disclose the contents to any 
other person, use it for any purpose, or store or copy the information in any 
medium. Thank you.

Reply via email to