[ https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670889#comment-16670889 ]
ASF GitHub Bot commented on KAFKA-7514: --------------------------------------- stanislavkozlovski opened a new pull request #5863: [KAFKA-7514] [WIP] consumer bench multiple threads URL: https://github.com/apache/kafka/pull/5863 This PR adds a new ConsumeBenchSpec field - "consumerCount". "consumerCount" will be spawned over "consumerCount" threads in the ConsumeBenchWorker. It's now questionable how existing fields such as "targetMessagesPerSec", "maxMessages", "consumerGroup" and "activeTopics" should work. With "activeTopics", we need to decide whether they should be split over the consumers or not. I see 4 cases which I believe we should address like this: Random group, subscribe to topics - N unique groups all subscribed to all topics Specific group, subscribe to topics - 1 group subscribed to all topics. Consumers share workload. Random groups, assign partitions - X groups all subscribed to all partitions Specific group, assign partitions - 1 group all subscribed to part of the partitions (split in a round-robin style) I believe "targetMessagesPerSec", "maxMessages" should account for each consumer individually. This would ease implementation too. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Trogdor - Support Multiple Threads in ConsumeBenchWorker > -------------------------------------------------------- > > Key: KAFKA-7514 > URL: https://issues.apache.org/jira/browse/KAFKA-7514 > Project: Kafka > Issue Type: Improvement > Reporter: Stanislav Kozlovski > Assignee: Stanislav Kozlovski > Priority: Minor > > Trogdor's ConsumeBenchWorker currently uses only two threads - one for the > StatusUpdater: > {code:java} > this.statusUpdaterFuture = executor.scheduleAtFixedRate( > new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, > TimeUnit.MINUTES); > {code} > and one for the consumer task itself > {code:java} > executor.submit(new ConsumeMessages(partitions)); > {code} > A sample ConsumeBenchSpec specification in JSON looks like this: > {code:java} > { > "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > "durationMs": 10000000, > "consumerNode": "node0", > "bootstrapServers": "localhost:9092", > "maxMessages": 100, > "activeTopics": { > "foo[1-3]": { > "numPartitions": 3, > "replicationFactor": 1 > } > } > } > {code} > > > h2. Motivation > This does not make the best use of machines with multiple cores. It would be > useful if there was a way to configure the ConsumeBenchSpec to use multiple > threads and spawn multiple consumers. This would also allow the > ConsumeBenchWorker to work with a higher amount of throughput due to the > parallelism taking place. > > h2. > h2. Proposal > Add a new `consumerCount` property to the ConsumeBenchSpec allowing you to > run multiple consumers in parallel > h2. Changes > By default, it will have a value of 1. > `activeTopics` will still be defined in the same way. They will be evenly > assigned to the consumers in a round-robin fashion. > For example, if we have this configuration > {code:java} > { > "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > "durationMs": 10000000, > "consumerNode": "node0", > "bootstrapServers": "localhost:9092", > "maxMessages": 100, > "consumerCount": 2, > "activeTopics": { > "foo[1-4]": { > "numPartitions": 4, > "replicationFactor": 1 > } > } > }{code} > consumer 1 will be assigned partitions [foo1, foo3] > consumer 2 will be assigned partitions [foo2, foo4] > and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 > for every consumer). > > The `maxMessages` and `targetMessagesPerSec` will be counted independently > for every consumer > h3. Status > The way the worker's status will be updated as well. > A ConsumeBenchWorker shows the following status when queried with > `./bin/trogdor.sh client --show-tasks localhost:8889` > > {code:java} > "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : > "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > ... > "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, > "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : > 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code} > We will change it to show the status of every separate consumer and the topic > partitions it was assigned to > {code:java} > "tasks" : { > "consume_bench_19938" : > { > "state" : "DONE", > "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > ... } > ... > "status":{ > "consumer-1":{ > "assignedPartitions":[ > "foo1", > "foo3" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > }, > "consumer-2":{ > "assignedPartitions":[ > "foo2", > "foo4" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > } > } > },{code} > > > h2. > Backwards Compatibility: > This change should be mostly backwards-compatible. If the `consumerThreads` > is not passed - only one consumer will be created and the round-robin > assignor will assign every partition to it. > The only change will be in the format of the reported status. Even with one > consumer, we will still show a status similar to > {code:java} > "status":{ > "consumer-1":{ > "assignedPartitions":[ > "foo1", > "foo3" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > } > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)