[ 
https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685457#comment-16685457
 ] 

ASF GitHub Bot commented on KAFKA-7514:
---------------------------------------

cmccabe closed pull request #5864: KAFKA-7514: Add threads to ConsumeBenchWorker
URL: https://github.com/apache/kafka/pull/5864
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/TROGDOR.md b/TROGDOR.md
index d71455a6e49..168acfb78c1 100644
--- a/TROGDOR.md
+++ b/TROGDOR.md
@@ -38,16 +38,14 @@ Let's confirm that all of the daemons are running:
 Now, we can submit a test job to Trogdor.  Here's an example of a short bash 
script which makes it easier.
 
     > ./tests/bin/trogdor-run-produce-bench.sh
-    [2018-04-12 10:32:04,055] DEBUG Sending POST with input 
{"id":"produce_bench_22137","spec":{"class":"org.apache.kafka.trogdor.workload.ProduceBenchSpec","startMs":0,"durationMs":10000000,"producerNode":"node0","bootstrapServers":"localhost:9092","targetMessagesPerSec":10,"maxMessages":100,"keyGenerator":{"type":"sequential","size":4,"startOffset":0},"valueGenerator":{"type":"constant","size":512,"value":"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="},"totalTopics":10,"activeTopics":5,"topicPrefix":"foo","replicationFactor":1,"classLoader":{},"numPartitions":1}}
 to http://localhost:8889/coordinator/task/create 
(org.apache.kafka.trogdor.coordinator.CoordinatorClient)
-    Created task.
-    $TASK_ID = produce_bench_20462
+    Sent CreateTaskRequest for task produce_bench_21634.$TASK_ID = 
produce_bench_21634
 
 To get the test results, we run --show-tasks:
 
     ./bin/trogdor.sh client --show-tasks localhost:8889
     Got coordinator tasks: {
       "tasks" : {
-        "produce_bench_20462" : {
+        "produce_bench_21634" : {
           "state" : "DONE",
           "spec" : {
             "class" : "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
@@ -55,8 +53,8 @@ To get the test results, we run --show-tasks:
             "durationMs" : 10000000,
             "producerNode" : "node0",
             "bootstrapServers" : "localhost:9092",
-            "targetMessagesPerSec" : 10,
-            "maxMessages" : 100,
+            "targetMessagesPerSec" : 10000,
+            "maxMessages" : 50000,
             "keyGenerator" : {
               "type" : "sequential",
               "size" : 4,
@@ -67,22 +65,28 @@ To get the test results, we run --show-tasks:
               "size" : 512,
               "value" : 
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
             },
-            "totalTopics" : 10,
-            "activeTopics" : 5,
-            "topicPrefix" : "foo",
-            "replicationFactor" : 1,
-            "classLoader" : { },
-            "numPartitions" : 1
+            "activeTopics" : {
+              "foo[1-3]" : {
+                "numPartitions" : 10,
+                "replicationFactor" : 1
+              }
+            },
+            "inactiveTopics" : {
+              "foo[4-5]" : {
+                "numPartitions" : 10,
+                "replicationFactor" : 1
+              }
+            }
           },
-          "startedMs" : 1523552769850,
-          "doneMs" : 1523552780878,
+          "startedMs" : 1541435949784,
+          "doneMs" : 1541435955803,
           "cancelled" : false,
           "status" : {
-            "totalSent" : 500,
-            "averageLatencyMs" : 4.972,
-            "p50LatencyMs" : 4,
-            "p95LatencyMs" : 6,
-            "p99LatencyMs" : 12
+            "totalSent" : 50000,
+            "averageLatencyMs" : 11.0293,
+            "p50LatencyMs" : 9,
+            "p95LatencyMs" : 27,
+            "p99LatencyMs" : 39
           }
         }
       }
@@ -141,7 +145,7 @@ ProduceBench starts a Kafka producer on a single agent 
node, producing to severa
 RoundTripWorkload tests both production and consumption.  The workload starts 
a Kafka producer and consumer on a single node.  The consumer will read back 
the messages that were produced by the producer.
 
 ### ConsumeBench
-ConsumeBench starts a Kafka consumer on a single agent node. Depending on the 
passed in configuration (see ConsumeBenchSpec), the consumer either subscribes 
to a set of topics (leveraging consumer group functionality) or manually 
assigns partitions to itself.
+ConsumeBench starts one or more Kafka consumers on a single agent node. 
Depending on the passed in configuration (see ConsumeBenchSpec), the consumers 
either subscribe to a set of topics (leveraging consumer group functionality 
and dynamic partition assignment) or manually assign partitions to themselves.
 The workload measures the average produce latency, as well as the median, 95th 
percentile, and 99th percentile latency.
 
 Faults
diff --git a/tests/bin/trogdor-run-consume-bench.sh 
b/tests/bin/trogdor-run-consume-bench.sh
index be9a2f1a941..0df396989c2 100755
--- a/tests/bin/trogdor-run-consume-bench.sh
+++ b/tests/bin/trogdor-run-consume-bench.sh
@@ -25,7 +25,10 @@ cat <<EOF
         "durationMs": 10000000,
         "consumerNode": "node0",
         "bootstrapServers": "localhost:9092",
-        "maxMessages": 100,
+        "targetMessagesPerSec": 1000,
+        "threadsPerWorker": 5,
+        "consumerGroup": "cg",
+        "maxMessages": 10000,
         "activeTopics": ["foo[1-3]"]
     }
 }
diff --git a/tests/bin/trogdor-run-produce-bench.sh 
b/tests/bin/trogdor-run-produce-bench.sh
index e96b4a1d8dd..26f67b5a4cb 100755
--- a/tests/bin/trogdor-run-produce-bench.sh
+++ b/tests/bin/trogdor-run-produce-bench.sh
@@ -25,17 +25,17 @@ cat <<EOF
         "durationMs": 10000000,
         "producerNode": "node0",
         "bootstrapServers": "localhost:9092",
-        "targetMessagesPerSec": 10,
-        "maxMessages": 100,
+        "targetMessagesPerSec": 10000,
+        "maxMessages": 50000,
         "activeTopics": {
             "foo[1-3]": {
-                "numPartitions": 3,
+                "numPartitions": 10,
                 "replicationFactor": 1
             }
         },
         "inactiveTopics": {
             "foo[4-5]": {
-                "numPartitions": 3,
+                "numPartitions": 10,
                 "replicationFactor": 1
             }
         }
diff --git a/tests/kafkatest/services/trogdor/consume_bench_workload.py 
b/tests/kafkatest/services/trogdor/consume_bench_workload.py
index 9e61b11928d..79ba863f33a 100644
--- a/tests/kafkatest/services/trogdor/consume_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/consume_bench_workload.py
@@ -21,7 +21,7 @@
 class ConsumeBenchWorkloadSpec(TaskSpec):
     def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
                  target_messages_per_sec, max_messages, active_topics,
-                 consumer_conf, common_client_conf, admin_client_conf, 
consumer_group=None):
+                 consumer_conf, common_client_conf, admin_client_conf, 
consumer_group=None, threads_per_worker=1):
         super(ConsumeBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
         self.message["class"] = 
"org.apache.kafka.trogdor.workload.ConsumeBenchSpec"
         self.message["consumerNode"] = consumer_node
@@ -32,6 +32,7 @@ def __init__(self, start_ms, duration_ms, consumer_node, 
bootstrap_servers,
         self.message["adminClientConf"] = admin_client_conf
         self.message["commonClientConf"] = common_client_conf
         self.message["activeTopics"] = active_topics
+        self.message["threadsPerWorker"] = threads_per_worker
         if consumer_group is not None:
             self.message["consumerGroup"] = consumer_group
 
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py 
b/tests/kafkatest/tests/core/consume_bench_test.py
index bec7416a40d..641ec7c918e 100644
--- a/tests/kafkatest/tests/core/consume_bench_test.py
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -86,7 +86,7 @@ def test_consume_bench(self, topics):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
 
-    def test_consume_bench_single_partition(self):
+    def test_single_partition(self):
         """
         Run a ConsumeBench against a single partition
         """
@@ -107,9 +107,32 @@ def test_consume_bench_single_partition(self):
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
 
-    def test_consume_group_bench(self):
+    def test_multiple_consumers_random_group_topics(self):
         """
-        Runs two ConsumeBench workloads in the same consumer group to read 
messages from topics
+        Runs multiple consumers group to read messages from topics.
+        Since a consumerGroup isn't specified, each consumer should read from 
all topics independently
+        """
+        self.produce_messages(self.active_topics, max_messages=5000)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=5000, # all 
should read exactly 5k messages
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                threads_per_worker=5,
+                                                
active_topics=["consume_bench_topic[0-5]"])
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_two_consumers_specified_group_topics(self):
+        """
+        Runs two consumers in the same consumer group to read messages from 
topics.
+        Since a consumerGroup is specified, each consumer should dynamically 
get assigned a partition from group
         """
         self.produce_messages(self.active_topics)
         consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
@@ -120,13 +143,62 @@ def test_consume_group_bench(self):
                                                 consumer_conf={},
                                                 admin_client_conf={},
                                                 common_client_conf={},
+                                                threads_per_worker=2,
                                                 consumer_group="testGroup",
                                                 
active_topics=["consume_bench_topic[0-5]"])
-        consume_workload_1 = self.trogdor.create_task("consume_workload_1", 
consume_spec)
-        consume_workload_2 = self.trogdor.create_task("consume_workload_2", 
consume_spec)
-        consume_workload_1.wait_for_done(timeout_sec=360)
-        self.logger.debug("Consume workload 1 finished")
-        consume_workload_2.wait_for_done(timeout_sec=360)
-        self.logger.debug("Consume workload 2 finished")
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_multiple_consumers_random_group_partitions(self):
+        """
+        Runs multiple consumers in to read messages from specific partitions.
+        Since a consumerGroup isn't specified, each consumer will get assigned 
a random group
+        and consume from all partitions
+        """
+        self.produce_messages(self.active_topics, max_messages=20000)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=2000,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                threads_per_worker=4,
+                                                
active_topics=["consume_bench_topic1:[0-4]"])
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload finished")
         tasks = self.trogdor.tasks()
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_multiple_consumers_specified_group_partitions_should_raise(self):
+        """
+        Runs multiple consumers in to read messages from specific partitions.
+        Since a consumerGroup isn't specified, each consumer will get assigned 
a random group
+        and consume from all partitions
+        """
+        self.produce_messages(self.active_topics, max_messages=20000)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=2000,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                threads_per_worker=4,
+                                                consumer_group="fail_group",
+                                                
active_topics=["consume_bench_topic1:[0-4]"])
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        try:
+            consume_workload.wait_for_done(timeout_sec=360)
+            raise Exception("Should have raised an exception due to an invalid 
configuration")
+        except RuntimeError as e:
+            if 'Will not split partitions' not in str(e):
+                raise RuntimeError("Unexpected Exception - " + str(e))
+            self.logger.info(e)
+
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index 6d4c67cde40..0e239b09885 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -21,6 +21,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.trogdor.common.StringExpander;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
@@ -61,6 +62,15 @@
  * #{@link 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)}.
  * It will be assigned partitions dynamically from the consumer group.
  *
+ * This specification supports the spawning of multiple consumers in the 
single Trogdor worker agent.
+ * The "threadsPerWorker" field denotes how many consumers should be spawned 
for this spec.
+ * It is worth noting that the "targetMessagesPerSec", "maxMessages" and 
"activeTopics" fields apply for every consumer individually.
+ *
+ * If a consumer group is not specified, every consumer is assigned a 
different, random group. When specified, all consumers use the same group.
+ * Since no two consumers in the same group can be assigned the same partition,
+ * explicitly specifying partitions in "activeTopics" when there are multiple 
"threadsPerWorker"
+ * and a particular "consumerGroup" will result in an #{@link 
ConfigException}, aborting the task.
+ *
  * An example JSON representation which will result in a consumer that is part 
of the consumer group "cg" and
  * subscribed to topics foo1, foo2, foo3 and bar.
  * #{@code
@@ -77,7 +87,6 @@
  */
 public class ConsumeBenchSpec extends TaskSpec {
 
-    static final String EMPTY_CONSUMER_GROUP = "";
     private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = 
"^[^:]+(:[\\d]+|[^:]*)$";
     private final String consumerNode;
     private final String bootstrapServers;
@@ -88,6 +97,7 @@
     private final Map<String, String> commonClientConf;
     private final List<String> activeTopics;
     private final String consumerGroup;
+    private final int threadsPerWorker;
 
     @JsonCreator
     public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@@ -100,6 +110,7 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long 
startMs,
                             @JsonProperty("consumerConf") Map<String, String> 
consumerConf,
                             @JsonProperty("commonClientConf") Map<String, 
String> commonClientConf,
                             @JsonProperty("adminClientConf") Map<String, 
String> adminClientConf,
+                            @JsonProperty("threadsPerWorker") Integer 
threadsPerWorker,
                             @JsonProperty("activeTopics") List<String> 
activeTopics) {
         super(startMs, durationMs);
         this.consumerNode = (consumerNode == null) ? "" : consumerNode;
@@ -110,7 +121,8 @@ public ConsumeBenchSpec(@JsonProperty("startMs") long 
startMs,
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
         this.activeTopics = activeTopics == null ? new ArrayList<>() : 
activeTopics;
-        this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : 
consumerGroup;
+        this.consumerGroup = consumerGroup == null ? "" : consumerGroup;
+        this.threadsPerWorker = threadsPerWorker == null ? 1 : 
threadsPerWorker;
     }
 
     @JsonProperty
@@ -138,6 +150,11 @@ public int maxMessages() {
         return maxMessages;
     }
 
+    @JsonProperty
+    public int threadsPerWorker() {
+        return threadsPerWorker;
+    }
+
     @JsonProperty
     public Map<String, String> consumerConf() {
         return consumerConf;
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index b0998f0bdcf..a44d5218f60 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -19,11 +19,13 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
@@ -39,19 +41,24 @@
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.Properties;
+import java.util.HashMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+
 public class ConsumeBenchWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumeBenchWorker.class);
 
@@ -61,10 +68,11 @@
     private final ConsumeBenchSpec spec;
     private final AtomicBoolean running = new AtomicBoolean(false);
     private ScheduledExecutorService executor;
-    private WorkerStatusTracker status;
+    private WorkerStatusTracker workerStatus;
+    private StatusUpdater statusUpdater;
+    private Future<?> statusUpdaterFuture;
     private KafkaFutureImpl<String> doneFuture;
-    private KafkaConsumer<byte[], byte[]> consumer;
-
+    private ThreadSafeConsumer consumer;
     public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -77,9 +85,12 @@ public void start(Platform platform, WorkerStatusTracker 
status,
             throw new IllegalStateException("ConsumeBenchWorker is already 
running.");
         }
         log.info("{}: Activating ConsumeBenchWorker with {}", id, spec);
+        this.statusUpdater = new StatusUpdater();
         this.executor = Executors.newScheduledThreadPool(
-            2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", 
false));
-        this.status = status;
+            spec.threadsPerWorker() + 2, // 1 thread for all the 
ConsumeStatusUpdater and 1 for the StatusUpdater
+            ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", 
false));
+        this.statusUpdaterFuture = 
executor.scheduleAtFixedRate(this.statusUpdater, 1, 1, TimeUnit.MINUTES);
+        this.workerStatus = status;
         this.doneFuture = doneFuture;
         executor.submit(new Prepare());
     }
@@ -88,41 +99,75 @@ public void start(Platform platform, WorkerStatusTracker 
status,
         @Override
         public void run() {
             try {
-                executor.submit(consumeTask());
+                List<Future<Void>> consumeTasks = new ArrayList<>();
+                for (ConsumeMessages task : consumeTasks()) {
+                    consumeTasks.add(executor.submit(task));
+                }
+                executor.submit(new CloseStatusUpdater(consumeTasks));
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
         }
 
-        private ConsumeMessages consumeTask() {
-            String consumerGroup = spec.consumerGroup();
+        private List<ConsumeMessages> consumeTasks() {
+            List<ConsumeMessages> tasks = new ArrayList<>();
+            String consumerGroup = consumerGroup();
+            int consumerCount = spec.threadsPerWorker();
             Map<String, List<TopicPartition>> partitionsByTopic = 
spec.materializeTopics();
-            boolean toUseGroupPartitionAssignment = 
partitionsByTopic.values().isEmpty();
+            boolean toUseGroupPartitionAssignment = 
partitionsByTopic.values().stream().allMatch(List::isEmpty);
+
+            if (!toUseGroupPartitionAssignment && !toUseRandomConsumeGroup() 
&& consumerCount > 1)
+                throw new ConfigException("You may not specify an explicit 
partition assignment when using multiple consumers in the same group."
+                    + "Please leave the consumer group unset, specify topics 
instead of partitions or use a single consumer.");
+
+            consumer = consumer(consumerGroup, clientId(0));
+            if (toUseGroupPartitionAssignment) {
+                Set<String> topics = partitionsByTopic.keySet();
+                tasks.add(new ConsumeMessages(consumer, topics));
+
+                for (int i = 0; i < consumerCount - 1; i++) {
+                    tasks.add(new ConsumeMessages(consumer(consumerGroup(), 
clientId(i + 1)), topics));
+                }
+            } else {
+                List<TopicPartition> partitions = 
populatePartitionsByTopic(consumer.consumer(), partitionsByTopic)
+                    
.values().stream().flatMap(List::stream).collect(Collectors.toList());
+                tasks.add(new ConsumeMessages(consumer, partitions));
 
-            if (consumerGroup.equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP)) 
// consumer group is undefined, the consumer should use a random group
-                consumerGroup = generateConsumerGroup();
+                for (int i = 0; i < consumerCount - 1; i++) {
+                    tasks.add(new ConsumeMessages(consumer(consumerGroup(), 
clientId(i + 1)), partitions));
+                }
+            }
 
-            consumer = consumer(consumerGroup);
-            if (!toUseGroupPartitionAssignment)
-                partitionsByTopic = populatePartitionsByTopic(consumer, 
partitionsByTopic);
+            return tasks;
+        }
 
-            return new ConsumeMessages(consumer, partitionsByTopic, 
toUseGroupPartitionAssignment);
+        private String clientId(int idx) {
+            return String.format("consumer.%s-%d", id, idx);
         }
 
-        private KafkaConsumer<byte[], byte[]> consumer(String consumerGroup) {
+        /**
+         * Creates a new KafkaConsumer instance
+         */
+        private ThreadSafeConsumer consumer(String consumerGroup, String 
clientId) {
             Properties props = new Properties();
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+            props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
             props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
             props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
             // these defaults maybe over-written by the user-specified 
commonClientConf or consumerConf
             WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
-            return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+            return new ThreadSafeConsumer(new KafkaConsumer<>(props, new 
ByteArrayDeserializer(), new ByteArrayDeserializer()), clientId);
+        }
+
+        private String consumerGroup() {
+            return toUseRandomConsumeGroup()
+                ? "consume-bench-" + UUID.randomUUID().toString()
+                : spec.consumerGroup();
         }
 
-        private String generateConsumerGroup() {
-            return "consume-bench-" + UUID.randomUUID().toString();
+        private boolean toUseRandomConsumeGroup() {
+            return spec.consumerGroup().isEmpty();
         }
 
         private Map<String, List<TopicPartition>> 
populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> consumer,
@@ -151,29 +196,35 @@ private String generateConsumerGroup() {
         private final Histogram messageSizeHistogram;
         private final Future<?> statusUpdaterFuture;
         private final Throttle throttle;
-        private final KafkaConsumer<byte[], byte[]> consumer;
+        private final String clientId;
+        private final ThreadSafeConsumer consumer;
 
-        ConsumeMessages(KafkaConsumer<byte[], byte[]> consumer, Map<String, 
List<TopicPartition>> topicPartitionsByTopic,
-                        boolean toUseGroupAssignment) {
+        private ConsumeMessages(ThreadSafeConsumer consumer) {
             this.latencyHistogram = new Histogram(5000);
             this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
+            this.clientId = consumer.clientId();
             this.statusUpdaterFuture = executor.scheduleAtFixedRate(
-                new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 
1, TimeUnit.MINUTES);
-            this.consumer = consumer;
-            if (toUseGroupAssignment) {
-                Set<String> topics = topicPartitionsByTopic.keySet();
-                log.info("Will consume from topics {} via dynamic group 
assignment.", topics);
-                this.consumer.subscribe(topics);
-            } else {
-                List<TopicPartition> partitions = 
topicPartitionsByTopic.values().stream()
-                    .flatMap(List::stream).collect(Collectors.toList());
-                log.info("Will consume from topic partitions {} via manual 
assignment.", partitions);
-                this.consumer.assign(partitions);
-            }
+                new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer), 1, 1, TimeUnit.MINUTES);
+            int perPeriod;
+            if (spec.targetMessagesPerSec() <= 0)
+                perPeriod = Integer.MAX_VALUE;
+            else
+                perPeriod = 
WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
 
-            int perPeriod = WorkerUtils.perSecToPerPeriod(
-                spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+            this.consumer = consumer;
+        }
+
+        ConsumeMessages(ThreadSafeConsumer consumer, Set<String> topics) {
+            this(consumer);
+            log.info("Will consume from topics {} via dynamic group 
assignment.", topics);
+            this.consumer.subscribe(topics);
+        }
+
+        ConsumeMessages(ThreadSafeConsumer consumer, List<TopicPartition> 
partitions) {
+            this(consumer);
+            log.info("Will consume from topic partitions {} via manual 
assignment.", partitions);
+            this.consumer.assign(partitions);
         }
 
         @Override
@@ -182,9 +233,10 @@ public Void call() throws Exception {
             long bytesConsumed = 0;
             long startTimeMs = Time.SYSTEM.milliseconds();
             long startBatchMs = startTimeMs;
+            int maxMessages = spec.maxMessages();
             try {
-                while (messagesConsumed < spec.maxMessages()) {
-                    ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(50));
+                while (messagesConsumed < maxMessages) {
+                    ConsumerRecords<byte[], byte[]> records = consumer.poll();
                     if (records.isEmpty()) {
                         continue;
                     }
@@ -202,6 +254,9 @@ public Void call() throws Exception {
                         latencyHistogram.add(elapsedBatchMs);
                         messageSizeHistogram.add(messageBytes);
                         bytesConsumed += messageBytes;
+                        if (messagesConsumed >= maxMessages)
+                            break;
+
                         throttle.increment();
                     }
                     startBatchMs = Time.SYSTEM.milliseconds();
@@ -211,23 +266,76 @@ public Void call() throws Exception {
             } finally {
                 statusUpdaterFuture.cancel(false);
                 StatusData statusData =
-                    new StatusUpdater(latencyHistogram, 
messageSizeHistogram).update();
+                    new ConsumeStatusUpdater(latencyHistogram, 
messageSizeHistogram, consumer).update();
                 long curTimeMs = Time.SYSTEM.milliseconds();
-                log.info("Consumed total number of messages={}, bytes={} in {} 
ms.  status: {}",
-                         messagesConsumed, bytesConsumed, curTimeMs - 
startTimeMs, statusData);
+                log.info("{} Consumed total number of messages={}, bytes={} in 
{} ms.  status: {}",
+                         clientId, messagesConsumed, bytesConsumed, curTimeMs 
- startTimeMs, statusData);
             }
             doneFuture.complete("");
+            consumer.close();
             return null;
         }
     }
 
-    public class StatusUpdater implements Runnable {
+    public class CloseStatusUpdater implements Runnable {
+        private final List<Future<Void>> consumeTasks;
+
+        CloseStatusUpdater(List<Future<Void>> consumeTasks) {
+            this.consumeTasks = consumeTasks;
+        }
+
+        @Override
+        public void run() {
+            while (!consumeTasks.stream().allMatch(Future::isDone)) {
+                try {
+                    Thread.sleep(60000);
+                } catch (InterruptedException e) {
+                    log.debug("{} was interrupted. Closing...", 
this.getClass().getName());
+                    break; // close the thread
+                }
+            }
+            statusUpdaterFuture.cancel(false);
+            statusUpdater.update();
+        }
+    }
+
+    class StatusUpdater implements Runnable {
+        final Map<String, JsonNode> statuses;
+
+        StatusUpdater() {
+            statuses = new HashMap<>();
+        }
+
+        @Override
+        public void run() {
+            try {
+                update();
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
+            }
+        }
+
+        synchronized void update() {
+            workerStatus.update(JsonUtil.JSON_SERDE.valueToTree(statuses));
+        }
+
+        synchronized void updateConsumeStatus(String clientId, StatusData 
status) {
+            statuses.put(clientId, JsonUtil.JSON_SERDE.valueToTree(status));
+        }
+    }
+
+    /**
+     * Runnable class that updates the status of a single consumer
+     */
+    public class ConsumeStatusUpdater implements Runnable {
         private final Histogram latencyHistogram;
         private final Histogram messageSizeHistogram;
+        private final ThreadSafeConsumer consumer;
 
-        StatusUpdater(Histogram latencyHistogram, Histogram 
messageSizeHistogram) {
+        ConsumeStatusUpdater(Histogram latencyHistogram, Histogram 
messageSizeHistogram, ThreadSafeConsumer consumer) {
             this.latencyHistogram = latencyHistogram;
             this.messageSizeHistogram = messageSizeHistogram;
+            this.consumer = consumer;
         }
 
         @Override
@@ -235,7 +343,7 @@ public void run() {
             try {
                 update();
             } catch (Exception e) {
-                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+                WorkerUtils.abort(log, "ConsumeStatusUpdater", e, doneFuture);
             }
         }
 
@@ -243,6 +351,7 @@ StatusData update() {
             Histogram.Summary latSummary = 
latencyHistogram.summarize(StatusData.PERCENTILES);
             Histogram.Summary msgSummary = 
messageSizeHistogram.summarize(StatusData.PERCENTILES);
             StatusData statusData = new StatusData(
+                consumer.assignedPartitions(),
                 latSummary.numSamples(),
                 (long) (msgSummary.numSamples() * msgSummary.average()),
                 (long) msgSummary.average(),
@@ -250,7 +359,7 @@ StatusData update() {
                 latSummary.percentiles().get(0).value(),
                 latSummary.percentiles().get(1).value(),
                 latSummary.percentiles().get(2).value());
-            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+            statusUpdater.updateConsumeStatus(consumer.clientId(), statusData);
             log.info("Status={}", JsonUtil.toJsonString(statusData));
             return statusData;
         }
@@ -258,6 +367,7 @@ StatusData update() {
 
     public static class StatusData {
         private final long totalMessagesReceived;
+        private final List<String> assignedPartitions;
         private final long totalBytesReceived;
         private final long averageMessageSizeBytes;
         private final float averageLatencyMs;
@@ -270,15 +380,16 @@ StatusData update() {
          * These should match up with the p50LatencyMs, p95LatencyMs, etc. 
fields.
          */
         final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
-
         @JsonCreator
-        StatusData(@JsonProperty("totalMessagesReceived") long 
totalMessagesReceived,
+        StatusData(@JsonProperty("assignedPartitions") List<String> 
assignedPartitions,
+                   @JsonProperty("totalMessagesReceived") long 
totalMessagesReceived,
                    @JsonProperty("totalBytesReceived") long totalBytesReceived,
                    @JsonProperty("averageMessageSizeBytes") long 
averageMessageSizeBytes,
                    @JsonProperty("averageLatencyMs") float averageLatencyMs,
                    @JsonProperty("p50LatencyMs") int p50latencyMs,
                    @JsonProperty("p95LatencyMs") int p95latencyMs,
                    @JsonProperty("p99LatencyMs") int p99latencyMs) {
+            this.assignedPartitions = assignedPartitions;
             this.totalMessagesReceived = totalMessagesReceived;
             this.totalBytesReceived = totalBytesReceived;
             this.averageMessageSizeBytes = averageMessageSizeBytes;
@@ -288,6 +399,11 @@ StatusData update() {
             this.p99LatencyMs = p99latencyMs;
         }
 
+        @JsonProperty
+        public List<String> assignedPartitions() {
+            return assignedPartitions;
+        }
+
         @JsonProperty
         public long totalMessagesReceived() {
             return totalMessagesReceived;
@@ -333,11 +449,86 @@ public void stop(Platform platform) throws Exception {
         doneFuture.complete("");
         executor.shutdownNow();
         executor.awaitTermination(1, TimeUnit.DAYS);
-        Utils.closeQuietly(consumer, "consumer");
+        consumer.close();
         this.consumer = null;
         this.executor = null;
-        this.status = null;
+        this.statusUpdater = null;
+        this.statusUpdaterFuture = null;
+        this.workerStatus = null;
         this.doneFuture = null;
     }
 
+    /**
+     * A thread-safe KafkaConsumer wrapper
+     */
+    private static class ThreadSafeConsumer {
+        private final KafkaConsumer<byte[], byte[]> consumer;
+        private final String clientId;
+        private final ReentrantLock consumerLock;
+        private boolean closed = false;
+
+        ThreadSafeConsumer(KafkaConsumer<byte[], byte[]> consumer, String 
clientId) {
+            this.consumer = consumer;
+            this.clientId = clientId;
+            this.consumerLock = new ReentrantLock();
+        }
+
+        ConsumerRecords<byte[], byte[]> poll() {
+            this.consumerLock.lock();
+            try {
+                return consumer.poll(Duration.ofMillis(50));
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        void close() {
+            if (closed)
+                return;
+            this.consumerLock.lock();
+            try {
+                consumer.unsubscribe();
+                Utils.closeQuietly(consumer, "consumer");
+                closed = true;
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        void subscribe(Set<String> topics) {
+            this.consumerLock.lock();
+            try {
+                consumer.subscribe(topics);
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        void assign(Collection<TopicPartition> partitions) {
+            this.consumerLock.lock();
+            try {
+                consumer.assign(partitions);
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        List<String> assignedPartitions() {
+            this.consumerLock.lock();
+            try {
+                return consumer.assignment().stream()
+                    
.map(TopicPartition::toString).collect(Collectors.toList());
+            } finally {
+                this.consumerLock.unlock();
+            }
+        }
+
+        String clientId() {
+            return clientId;
+        }
+
+        KafkaConsumer<byte[], byte[]> consumer() {
+            return consumer;
+        }
+    }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
index 117954b7caa..a3b10862a6b 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
@@ -73,6 +73,6 @@ public void 
testInvalidTopicNameRaisesExceptionInMaterialize() {
     private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
         return new ConsumeBenchSpec(0, 0, "node", "localhost",
             123, 1234, "cg-1",
-            Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), activeTopics);
+            Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), 1, activeTopics);
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Support Multiple Threads in ConsumeBenchWorker
> --------------------------------------------------------
>
>                 Key: KAFKA-7514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7514
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Minor
>
> Trogdor's ConsumeBenchWorker currently uses only two threads - one for the 
> StatusUpdater:
> {code:java}
> this.statusUpdaterFuture = executor.scheduleAtFixedRate(
>         new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, 
> TimeUnit.MINUTES);
> {code}
> and one for the consumer task itself
> {code:java}
> executor.submit(new ConsumeMessages(partitions));
> {code}
> A sample ConsumeBenchSpec specification in JSON looks like this:
> {code:java}
> {
>     "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
>     "durationMs": 10000000,
>     "consumerNode": "node0",
>     "bootstrapServers": "localhost:9092",
>     "maxMessages": 100,
>     "activeTopics": {
>         "foo[1-3]": {
>             "numPartitions": 3,
>             "replicationFactor": 1
>         }
>     }
> }
> {code}
>  
>  
> h2. Motivation
> This does not make the best use of machines with multiple cores. It would be 
> useful if there was a way to configure the ConsumeBenchSpec to use multiple 
> threads and spawn multiple consumers. This would also allow the 
> ConsumeBenchWorker to work with a higher amount of throughput due to the 
> parallelism taking place.
>  
> h2.  
> h2. Proposal
> Add a new `consumerCount` property to the ConsumeBenchSpec allowing you to 
> run multiple consumers in parallel 
> h2. Changes
> By default, it will have a value of 1.
> `activeTopics` will still be defined in the same way. They will be evenly 
> assigned to the consumers in a round-robin fashion.
> For example, if we have this configuration
> {code:java}
> {
>     "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
>     "durationMs": 10000000,
>     "consumerNode": "node0",
>     "bootstrapServers": "localhost:9092",
>     "maxMessages": 100,
>     "consumerCount": 2,
>     "activeTopics": {
>         "foo[1-4]": {
>             "numPartitions": 4,
>             "replicationFactor": 1
>         }
>     }
> }{code}
> consumer 1 will be assigned partitions [foo1, foo3]
> consumer 2 will be assigned partitions [foo2, foo4]
> and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 
> for every consumer).
>  
> The `maxMessages` and `targetMessagesPerSec` will be counted independently 
> for every consumer
> h3. Status
> The way the worker's status will be updated as well. 
> A ConsumeBenchWorker shows the following status when queried with 
> `./bin/trogdor.sh client --show-tasks localhost:8889`
>  
> {code:java}
> "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : 
> "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
> ...
> "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, 
> "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 
> 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code}
> We will change it to show the status of every separate consumer and the topic 
> partitions it was assigned to
> {code:java}
> "tasks" : { 
> "consume_bench_19938" : 
> {
> "state" : "DONE",
> "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", 
> ... }
> ...
> "status":{  
>    "consumer-1":{  
>       "assignedPartitions":[  
>          "foo1",
>          "foo3"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    },
> "consumer-2":{  
>       "assignedPartitions":[  
>          "foo2",
>          "foo4"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    }
> }
> },{code}
>  
>  
> h2.  
> Backwards Compatibility:
> This change should be mostly backwards-compatible. If the `consumerThreads` 
> is not passed - only one consumer will be created and the round-robin 
> assignor will assign every partition to it.
> The only change will be in the format of the reported status. Even with one 
> consumer, we will still show a status similar to
> {code:java}
> "status":{  
>    "consumer-1":{  
>       "assignedPartitions":[  
>          "foo1",
>          "foo3"
>       ],
>       "totalMessagesReceived":190,
>       "totalBytesReceived":98040,
>       "averageMessageSizeBytes":516,
>       "averageLatencyMs":449.0,
>       "p50LatencyMs":449,
>       "p95LatencyMs":449,
>       "p99LatencyMs":449
>    }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to