[ 
https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670891#comment-16670891
 ] 

ASF GitHub Bot commented on KAFKA-7514:
---------------------------------------

stanislavkozlovski opened a new pull request #5864: KAFKA-7514: Add threads to 
ConsumeBenchWorker
URL: https://github.com/apache/kafka/pull/5864
 
 
   This PR adds a new ConsumeBenchSpec field - "consumerCount". "consumerCount" 
will be spawned over "consumerCount" threads in the ConsumeBenchWorker.
   
   It's now questionable how existing fields such as "targetMessagesPerSec", 
"maxMessages", "consumerGroup" and "activeTopics" should work.
   
   With "activeTopics", we need to decide whether they should be split over the 
consumers or not.
   I see 4 cases which I believe we should address like this:
   
   Random group, subscribe to topics - N unique groups all subscribed to all 
topics
   Specific group, subscribe to topics - 1 group subscribed to all topics. 
Consumers share workload.
   Random groups, assign partitions - X groups all subscribed to all partitions
   Specific group, assign partitions - 1 group all subscribed to part of the 
partitions (split in a round-robin style)
   I believe "targetMessagesPerSec", "maxMessages" should account for each 
consumer individually. This would ease implementation by a ton, too.
   
   I haven't written tests yet since I want to flesh out the design first. Any 
feedback is appreciated

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Support Multiple Threads in ConsumeBenchWorker
> --------------------------------------------------------
>
>                 Key: KAFKA-7514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7514
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Minor
>
> Trogdor's ConsumeBenchWorker currently uses only two threads - one for the 
> StatusUpdater:
> {code:java}
> this.statusUpdaterFuture = executor.scheduleAtFixedRate(
>         new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, 
> TimeUnit.MINUTES);
> {code}
> and one for the consumer task itself
> {code:java}
> executor.submit(new ConsumeMessages(partitions));
> {code}
> A sample ConsumeBenchSpec specification in JSON looks like this:
> {code:java}
> {
>     "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
>     "durationMs": 10000000,
>     "consumerNode": "node0",
>     "bootstrapServers": "localhost:9092",
>     "maxMessages": 100,
>     "activeTopics": {
>         "foo[1-3]": {
>             "numPartitions": 3,
>             "replicationFactor": 1
>         }
>     }
> }
> {code}
>  
>  
> h2. Motivation
> This does not make the best use of machines with multiple cores. It would be 
> useful if there was a way to configure the ConsumeBenchSpec to use multiple 
> threads and spawn multiple consumers. This would also allow the 
> ConsumeBenchWorker to work with a higher amount of throughput due to the 
> parallelism taking place.
>  
> h2.  
> h2. Proposal
> Add a new `consumerCount` property to the ConsumeBenchSpec allowing you to 
> run multiple consumers in parallel 
> h2. Changes
> By default, it will have a value of 1.
> `activeTopics` will still be defined in the same way. They will be evenly 
> assigned to the consumers in a round-robin fashion.
> For example, if we have this configuration
> {code:java}
> {
>     "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
>     "durationMs": 10000000,
>     "consumerNode": "node0",
>     "bootstrapServers": "localhost:9092",
>     "maxMessages": 100,
>     "consumerCount": 2,
>     "activeTopics": {
>         "foo[1-4]": {
>             "numPartitions": 4,
>             "replicationFactor": 1
>         }
>     }
> }{code}
> consumer 1 will be assigned partitions [foo1, foo3]
> consumer 2 will be assigned partitions [foo2, foo4]
> and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 
> for every consumer).
>  
> The `maxMessages` and `targetMessagesPerSec` will be counted independently 
> for every consumer
> h3. Status
> The way the worker's status will be updated as well. 
> A ConsumeBenchWorker shows the following status when queried with 
> `./bin/trogdor.sh client --show-tasks localhost:8889`
>  
> {code:java}
> "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : 
> "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
> ...
> "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, 
> "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 
> 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code}
> We will change it to show the status of every separate consumer and the topic 
> partitions it was assigned to
> {code:java}
> "tasks" : { 
> "consume_bench_19938" : 
> {
> "state" : "DONE",
> "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
> ... }
> ...
> "status":{  
>    "consumer-1":{  
>       "assignedPartitions":[  
>          "foo1",
>          "foo3"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    },
> "consumer-2":{  
>       "assignedPartitions":[  
>          "foo2",
>          "foo4"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    }
> }
> },{code}
>  
>  
> h2.  
> Backwards Compatibility:
> This change should be mostly backwards-compatible. If the `consumerThreads` 
> is not passed - only one consumer will be created and the round-robin 
> assignor will assign every partition to it.
> The only change will be in the format of the reported status. Even with one 
> consumer, we will still show a status similar to
> {code:java}
> "status":{  
>    "consumer-1":{  
>       "assignedPartitions":[  
>          "foo1",
>          "foo3"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to