[
https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stanislav Kozlovski resolved KAFKA-7514.
----------------------------------------
Resolution: Fixed
> Trogdor - Support Multiple Threads in ConsumeBenchWorker
> --------------------------------------------------------
>
> Key: KAFKA-7514
> URL: https://issues.apache.org/jira/browse/KAFKA-7514
> Project: Kafka
> Issue Type: Improvement
> Reporter: Stanislav Kozlovski
> Assignee: Stanislav Kozlovski
> Priority: Minor
>
> Trogdor's ConsumeBenchWorker currently uses only two threads - one for the
> StatusUpdater:
> {code:java}
> this.statusUpdaterFuture = executor.scheduleAtFixedRate(
> new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1,
> TimeUnit.MINUTES);
> {code}
> and one for the consumer task itself
> {code:java}
> executor.submit(new ConsumeMessages(partitions));
> {code}
> A sample ConsumeBenchSpec specification in JSON looks like this:
> {code:java}
> {
> "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
> "durationMs": 10000000,
> "consumerNode": "node0",
> "bootstrapServers": "localhost:9092",
> "maxMessages": 100,
> "activeTopics": {
> "foo[1-3]": {
> "numPartitions": 3,
> "replicationFactor": 1
> }
> }
> }
> {code}
>
>
> h2. Motivation
> This does not make the best use of machines with multiple cores. It would be
> useful if there was a way to configure the ConsumeBenchSpec to use multiple
> threads and spawn multiple consumers. This would also allow the
> ConsumeBenchWorker to work with a higher amount of throughput due to the
> parallelism taking place.
>
> h2.
> h2. Proposal
> Add a new `consumerCount` property to the ConsumeBenchSpec allowing you to
> run multiple consumers in parallel
> h2. Changes
> By default, it will have a value of 1.
> `activeTopics` will still be defined in the same way. They will be evenly
> assigned to the consumers in a round-robin fashion.
> For example, if we have this configuration
> {code:java}
> {
> "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
> "durationMs": 10000000,
> "consumerNode": "node0",
> "bootstrapServers": "localhost:9092",
> "maxMessages": 100,
> "consumerCount": 2,
> "activeTopics": {
> "foo[1-4]": {
> "numPartitions": 4,
> "replicationFactor": 1
> }
> }
> }{code}
> consumer 1 will be assigned partitions [foo1, foo3]
> consumer 2 will be assigned partitions [foo2, foo4]
> and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2
> for every consumer).
>
> The `maxMessages` and `targetMessagesPerSec` will be counted independently
> for every consumer
> h3. Status
> The way the worker's status will be updated as well.
> A ConsumeBenchWorker shows the following status when queried with
> `./bin/trogdor.sh client --show-tasks localhost:8889`
>
> {code:java}
> "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" :
> "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
> ...
> "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040,
> "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" :
> 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code}
> We will change it to show the status of every separate consumer and the topic
> partitions it was assigned to
> {code:java}
> "tasks" : {
> "consume_bench_19938" :
> {
> "state" : "DONE",
> "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
> ... }
> ...
> "status":{
> "consumer-1":{
> "assignedPartitions":[
> "foo1",
> "foo3"
> ],
> "totalMessagesReceived":190,
> "totalBytesReceived":98040,
> "averageMessageSizeBytes":516,
> "averageLatencyMs":449.0,
> "p50LatencyMs":449,
> "p95LatencyMs":449,
> "p99LatencyMs":449
> },
> "consumer-2":{
> "assignedPartitions":[
> "foo2",
> "foo4"
> ],
> "totalMessagesReceived":190,
> "totalBytesReceived":98040,
> "averageMessageSizeBytes":516,
> "averageLatencyMs":449.0,
> "p50LatencyMs":449,
> "p95LatencyMs":449,
> "p99LatencyMs":449
> }
> }
> },{code}
>
>
> h2.
> Backwards Compatibility:
> This change should be mostly backwards-compatible. If the `consumerThreads`
> is not passed - only one consumer will be created and the round-robin
> assignor will assign every partition to it.
> The only change will be in the format of the reported status. Even with one
> consumer, we will still show a status similar to
> {code:java}
> "status":{
> "consumer-1":{
> "assignedPartitions":[
> "foo1",
> "foo3"
> ],
> "totalMessagesReceived":190,
> "totalBytesReceived":98040,
> "averageMessageSizeBytes":516,
> "averageLatencyMs":449.0,
> "p50LatencyMs":449,
> "p95LatencyMs":449,
> "p99LatencyMs":449
> }
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)