I’m moving this issue from Stack Overflow to the Apache Kafka Users List

http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages
 
<http://stackoverflow.com/questions/34213272/kafka-producer-0-9-performance-issue-with-small-messages>

There was some discussion on Stack Overflow, but I’m expecting the Apache List 
will be better able to help in identifying the root cause since it appears not 
necessarily to be a BlueMix Message Hub issue.  Thanks!

-----

We are observing very poor performance with a Java Kafka Producer 0.9 client 
when sending small messages.  The messages are not being accumulated into a 
larger request batch and thus each small record is being sent separately.

What is wrong with our client configuration?  Or is this some other issue?

-----

Using Kafka Client 0.9.0.0.  We did not see any related postings in the Kafka  
unreleased 9.0.1 or 9.1 fixed or unresolved lists, so we are focused on our 
client configuration and server instance.

We understand the linger.ms should cause the client to accumulate records into 
a batch.

We set linger.ms to 10 (and also tried 100 and 1000) but these did not result 
in the batch accumulating records.  With a record size of about 100 bytes and a 
request buffer size of 16K, We would have expected about 160 messages to be 
sent in a single request.

The trace at the client seems to indicate that the partition may be full, 
despite having allocated a fresh Bluemix Messaging Hub (Kafka Server 0.9) 
service instance.  The test client is sending multiple messages in a loop with 
no other I/O.  

-----

The log shows a repeating sequence with a suspect line: "**Waking up the sender 
since topic mytopic partition 0 is either full or getting a new batch**".

So the newly allocated partition should be essentially empty in our test case, 
thus why would the producer client be getting a new batch?

<pre>
2015-12-10 15:14:41,335 3677 [main] TRACE 
com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', 
Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 
2015-12-10T15:14:41.335-05:00'  
2015-12-10 15:14:41,336 3678 [main] TRACE 
org.apache.kafka.clients.producer.KafkaProducer  - Sending record 
ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, 
value=[B@4923ab24 with callback null to topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE 
org.apache.kafka.clients.producer.internals.RecordAccumulator  - Allocating a 
new 16384 byte message buffer for topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE 
org.apache.kafka.clients.producer.KafkaProducer  - Waking up the sender since 
topic mytopic partition 0 is either full or getting a new batch  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] 
TRACE org.apache.kafka.clients.producer.internals.Sender  - Nodes with data 
ready to send: [Node(0, 
kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] 
TRACE org.apache.kafka.clients.producer.internals.Sender  - Created 1 produce 
requests: [ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, 
request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer},
 
body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0
 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] 
TRACE org.apache.kafka.clients.producer.internals.Sender  - Received produce 
response from node 0 with correlation id 11  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] 
TRACE org.apache.kafka.clients.producer.internals.RecordBatch  - Produced 
messages to topic-partition mytopic-0 with base offset offset 130 and error: 
null.  
2015-12-10 15:14:41,412 3754 [main] TRACE 
com.isllc.client.producer.ExploreProducer  - Send returned metadata: 
Topic='mytopic', Partition=0, Offset=130  
2015-12-10 15:14:41,412 3754 [main] TRACE 
com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', 
Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 
2015-12-10T15:14:41.412-05:00'

Log entries repeat like the above for each record sent
</pre>

------

We provided the following properties file:

<pre>
2015-12-10 15:14:37,843 185  [main] INFO  com.isllc.client.AbstractClient  - 
Properties retrieved from file for Kafka client: kafka-producer.properties
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -    
 acks=-1
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -    
 ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -    
 key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 client.id=ExploreProducer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 
ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 
bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -    
 security.protocol=SASL_SSL

Plus we added linger.ms=10 in code.
</pre>

-----

The Kafka Client shows the expanded/merged configuration list (and displaying 
the linger.ms setting):

<pre>
2015-12-10 15:14:37,970 312  [main] INFO  
org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values: 
        compression.type = none
        metric.reporters = []
        metadata.max.age.ms = 300000
        metadata.fetch.timeout.ms = 60000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = 
[kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, 
kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, 
kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, 
kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, 
kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        max.block.ms = 60000
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = [hidden]
        max.in.flight.requests.per.connection = 5
        metrics.num.samples = 2
        client.id = ExploreProducer
        ssl.endpoint.identification.algorithm = null
        ssl.protocol = TLSv1.2
        request.timeout.ms = 30000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2]
        acks = -1
        batch.size = 16384
        ssl.keystore.location = null
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = SASL_SSL
        retries = 0
        max.request.size = 1048576
        value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.truststore.location = 
/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        send.buffer.bytes = 131072
        linger.ms = 10
</pre>
-----

The Kafka metrics after sending 100 records:  

<pre>
Duration for 100 sends 8787 ms. Sent 7687 bytes.  
    batch-size-avg = 109.87 [The average number of bytes sent per partition 
per-request.]  
    batch-size-max = 110.0 [The max number of bytes sent per partition 
per-request.]  
    buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory 
that is not being used (either unallocated or in the free list).]  
    buffer-exhausted-rate = 0.0 [The average per-second number of record sends 
that are dropped due to buffer exhaustion]  
    buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the 
client can use (whether or not it is currently used).]  
    bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for 
space allocation.]  
    byte-rate = 291.8348916277093 []  
    compression-rate = 0.0 []  
    compression-rate-avg = 0.0 [The average compression rate of record 
batches.]  
    connection-close-rate = 0.0 [Connections closed per second in the window.]  
    connection-count = 2.0 [The current number of active connections.]  
    connection-creation-rate = 0.05180541884681138 [New connections established 
per second in the window.]  
    incoming-byte-rate = 10.342564641029007 []  
    io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent 
doing I/O]  
    io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per 
select call in nanoseconds.]  
    io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread 
spent waiting.]  
    io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the 
I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.]
    metadata-age = 8.096 [The age in seconds of the current producer metadata 
being used.]  
    network-io-rate = 5.2937784999213795 [The average number of network 
operations (reads or writes) on all connections per second.]  
    outgoing-byte-rate = 451.2298783403283 []  
    produce-throttle-time-avg = 0.0 [The average throttle time in ms]  
    produce-throttle-time-max = 0.0 [The maximum throttle time in ms]  
    record-error-rate = 0.0 [The average per-second number of record sends that 
resulted in errors]  
    record-queue-time-avg = 15.5 [The average time in ms record batches spent 
in the record accumulator.]  
    record-queue-time-max = 434.0 [The maximum time in ms record batches spent 
in the record accumulator.]  
    record-retry-rate = 0.0 []  
    record-send-rate = 2.65611304417116 [The average number of records sent per 
second.]  
    record-size-avg = 97.87 [The average record size]  
    record-size-max = 98.0 [The maximum record size]  
    records-per-request-avg = 1.0 [The average number of records per request.]  
    request-latency-avg = 0.0 [The average request latency in ms]  
    request-latency-max = 74.0 []  
    request-rate = 2.6468892499606897 [The average number of requests sent per 
second.]  
    request-size-avg = 42.0 [The average size of all requests in the window..]  
    request-size-max = 170.0 [The maximum size of any request sent in the 
window.]  
    requests-in-flight = 0.0 [The current number of in-flight requests awaiting 
a response.]  
    response-rate = 2.651196976060479 [The average number of responses received 
per second.]  
    select-rate = 10.989861465830819 [Number of times the I/O layer checked for 
new I/O to perform per second]  
    waiting-threads = 0.0 [The number of user threads blocked waiting for 
buffer memory to enqueue their records]  
</pre>

Thanks


Gary Gershon
Principal, Intermedia Sciences LLC
(908) 969-1119
g...@intermediasciences.com




Reply via email to