Stanislav Kozlovski created KAFKA-7514: ------------------------------------------
Summary: Trogdor - Support Multiple Threads in ConsumeBenchWorker Key: KAFKA-7514 URL: https://issues.apache.org/jira/browse/KAFKA-7514 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor's ConsumeBenchWorker currently uses only two threads - one for the StatusUpdater: {code:java} this.statusUpdaterFuture = executor.scheduleAtFixedRate( new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES); {code} and one for the consumer task itself {code:java} executor.submit(new ConsumeMessages(partitions)); {code} A sample ConsumeBenchSpec specification in JSON looks like this: {code:java} { "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", "durationMs": 10000000, "consumerNode": "node0", "bootstrapServers": "localhost:9092", "maxMessages": 100, "activeTopics": { "foo[1-3]": { "numPartitions": 3, "replicationFactor": 1 } } } {code} h2. Motivation This does not make the best use of machines with multiple cores. It would be useful if there was a way to configure the ConsumeBenchSpec to use multiple threads and spawn multiple consumers. This would also allow the ConsumeBenchWorker to work with a higher amount of throughput due to the parallelism taking place. h2. Proposal: Add a new `consumerThreads` property to the ConsumeBenchSpec allowing you to run multiple consumers in parallel Changes By default, it will have a value of 1. `activeTopics` will still be defined in the same way. They will be evenly assigned to the consumers in a round-robin fashion. For example, if we have this configuration {code:java} { "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", "durationMs": 10000000, "consumerNode": "node0", "bootstrapServers": "localhost:9092", "maxMessages": 100, "consumerThreads": 2, "activeTopics": { "foo[1-4]": { "numPartitions": 4, "replicationFactor": 1 } } }{code} consumer thread 1 will be assigned partitions [foo1, foo3] consumer thread 2 will be assigned partitions [foo2, foo4] and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 for every consumer) h3. Status The way the worker's status will be updated as well. A ConsumeBenchWorker shows the following status when queried with `./bin/trogdor.sh client --show-tasks localhost:8889` {code:java} "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", ... "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code} We will change it to show the status of every separate consumer and the topic partitions it was assigned to {code:java} "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", ... "status" : { "consumer-1": { "assignedPartitions": ["foo1", "foo3"], "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } "consumer-2": { "assignedPartitions": ["foo2", "foo4"], "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } } },{code} h2. Backwards Compatibility: This change should be mostly backwards-compatible. If the `consumerThreads` is not passed - only one consumer will be created and the round-robin assignor will assign every partition to it. The only change will be in the format of the reported status. Even with one consumer, we will still show a status similar to {code:java} "status" : { "consumer-1": { "assignedPartitions": ["foo1", "foo3"], "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)