This is an automated email from the ASF dual-hosted git repository.

mmodzelewski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ae123f43 feat(java): add actor and data batch generator for java 
bench (#3218)
1ae123f43 is described below

commit 1ae123f4324fc787245adf8fc5089123d45aad1a
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Sat May 16 23:33:59 2026 +0900

    feat(java): add actor and data batch generator for java bench (#3218)
---
 .../tcp/async/TcpAsyncPinnedProducerActor.java     | 121 +++++++++++++++++++++
 .../runners/tcp/async/TcpAsyncPinnedProducer.java  |  94 ++++++++++++++++
 .../tcp/async/TcpAsyncPinnedProducer.java          |  51 ---------
 .../iggy/bench/cli/PinnedProducerCommand.java      |   2 +-
 .../enums/ActorKind.java}                          |  22 +++-
 .../enums/BenchmarkKind.java}                      |  27 ++++-
 .../enums/GroupKind.java}                          |  23 +++-
 .../enums/TransportKind.java}                      |  21 +++-
 .../{ => common}/exception/BenchmarkException.java |   2 +-
 .../common/generator/BenchmarkBatchGenerator.java  |  75 +++++++++++++
 .../provision/ResourceProvisioner.java             |   6 +-
 .../iggy/bench/models/cli/GlobalCliArgs.java       |   2 +-
 .../bench/models/cli/PinnedProducerCliArgs.java    |   2 +-
 .../generator/DataBatch.java}                      |   6 +-
 .../provision/ProvisionedResources.java            |   2 +-
 15 files changed, 371 insertions(+), 85 deletions(-)

diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java
new file mode 100644
index 000000000..29adfb983
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.benchmarks.actors.tcp.async;
+
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.common.generator.DataBatch;
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Partitioning;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+
+public final class TcpAsyncPinnedProducerActor {
+
+    private static final long PARTITION_ID = 0L;
+
+    private final GlobalCliArgs globalCliArgs;
+    private final int actorId;
+    private final StreamId streamId;
+    private final TopicId topicId;
+    private final Partitioning partitioning;
+    private final DataBatch fullBatch;
+    private final long targetMessageBatches;
+    private final long targetDataBytes;
+    private AsyncIggyTcpClient client;
+
+    public TcpAsyncPinnedProducerActor(
+            GlobalCliArgs globalCliArgs,
+            int actorId,
+            String streamName,
+            String topicName,
+            DataBatch fullBatch,
+            long targetMessageBatches,
+            long targetDataBytes) {
+        this.globalCliArgs = globalCliArgs;
+        this.actorId = actorId;
+        this.streamId = StreamId.of(streamName);
+        this.topicId = TopicId.of(topicName);
+        this.partitioning = Partitioning.partitionId(PARTITION_ID);
+        this.fullBatch = fullBatch;
+        this.targetMessageBatches = targetMessageBatches;
+        this.targetDataBytes = targetDataBytes;
+    }
+
+    public CompletableFuture<Void> run() {
+        try {
+            return AsyncIggyTcpClient.builder()
+                    .credentials(globalCliArgs.username(), 
globalCliArgs.password())
+                    .buildAndLogin()
+                    .thenCompose(client -> {
+                        this.client = client;
+                        long warmupDeadline = globalCliArgs.warmupTimeMs() <= 
0L
+                                ? 0L
+                                : System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(globalCliArgs.warmupTimeMs());
+
+                        return sendMessages(warmupDeadline, 0L, 0L)
+                                .handle((ignored, sendFailure) -> {
+                                    CompletableFuture<Void> closeFuture = 
client.close();
+
+                                    if (sendFailure != null) {
+                                        CompletableFuture<Void> 
failedAfterClose =
+                                                
closeFuture.handle((closeIgnored, closeFailure) -> {
+                                                    throw new 
CompletionException(sendFailure);
+                                                });
+                                        return failedAfterClose;
+                                    }
+
+                                    return closeFuture;
+                                })
+                                .thenCompose(future -> future);
+                    });
+        } catch (RuntimeException exception) {
+            return CompletableFuture.failedFuture(exception);
+        }
+    }
+
+    private CompletableFuture<Void> sendMessages(long warmupDeadline, long 
sentBatches, long sentBytes) {
+        if (warmupDeadline > 0L) {
+            if (System.nanoTime() >= warmupDeadline) {
+                return sendMessages(0L, 0L, 0L);
+            }
+
+            return sendBatch(fullBatch, false)
+                    .thenCompose(ignored -> sendMessages(warmupDeadline, 
sentBatches, sentBytes));
+        }
+
+        if (targetDataBytes > 0L ? sentBytes >= targetDataBytes : sentBatches 
>= targetMessageBatches) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return sendBatch(fullBatch, true).thenCompose(ignored -> {
+            long updatedSentBatches = sentBatches + 1L;
+            long updatedSentBytes = sentBytes + fullBatch.userDataBytes();
+            return sendMessages(0L, updatedSentBatches, updatedSentBytes);
+        });
+    }
+
+    private CompletableFuture<Void> sendBatch(DataBatch currentBatch, boolean 
recordMetrics) {
+        return client.messages().sendMessages(streamId, topicId, partitioning, 
currentBatch.messages());
+    }
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java
new file mode 100644
index 000000000..bfe9ee882
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.benchmarks.runners.tcp.async;
+
+import 
org.apache.iggy.bench.benchmarks.actors.tcp.async.TcpAsyncPinnedProducerActor;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
+import org.apache.iggy.bench.common.generator.BenchmarkBatchGenerator;
+import org.apache.iggy.bench.common.provision.ResourceProvisioner;
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.apache.iggy.bench.models.common.generator.DataBatch;
+import org.apache.iggy.bench.models.common.provision.ProvisionedResources;
+
+import java.util.concurrent.CompletableFuture;
+
+public final class TcpAsyncPinnedProducer {
+
+    private final GlobalCliArgs globalCliArgs;
+    private final PinnedProducerCliArgs pinnedProducerCliArgs;
+    private final ResourceProvisioner resourceProvisioner;
+    private ProvisionedResources provisionedResources;
+
+    TcpAsyncPinnedProducer(
+            GlobalCliArgs globalCliArgs,
+            PinnedProducerCliArgs pinnedProducerCliArgs,
+            ResourceProvisioner resourceProvisioner) {
+        this.globalCliArgs = globalCliArgs;
+        this.pinnedProducerCliArgs = pinnedProducerCliArgs;
+        this.resourceProvisioner = resourceProvisioner;
+    }
+
+    public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs, 
PinnedProducerCliArgs pinnedProducerCliArgs) {
+        this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner());
+    }
+
+    public void provisionResources() {
+        this.provisionedResources = 
resourceProvisioner.provisionResources(globalCliArgs, pinnedProducerCliArgs);
+    }
+
+    public void run() {
+        runBenchmark().join();
+    }
+
+    private CompletableFuture<Void> runBenchmark() {
+        try {
+            if (provisionedResources == null) {
+                throw new BenchmarkException("Benchmark resources must be 
provisioned before running.");
+            }
+
+            String topicName = provisionedResources.topicNames().get(0);
+            var batchGenerator =
+                    new BenchmarkBatchGenerator(globalCliArgs.messageSize(), 
globalCliArgs.messagesPerBatch());
+            DataBatch fullBatch = batchGenerator.generateBatch();
+            long targetMessageBatches = globalCliArgs.totalData() > 0L ? 0L : 
globalCliArgs.messageBatches();
+            long targetDataBytes =
+                    globalCliArgs.totalData() > 0L ? globalCliArgs.totalData() 
/ pinnedProducerCliArgs.producers() : 0L;
+            var actorRuns = new 
CompletableFuture<?>[pinnedProducerCliArgs.producers()];
+
+            for (int index = 0; index < pinnedProducerCliArgs.producers(); 
index++) {
+                String streamName = 
provisionedResources.streamNames().get(index);
+                var actor = new TcpAsyncPinnedProducerActor(
+                        globalCliArgs,
+                        index + 1,
+                        streamName,
+                        topicName,
+                        fullBatch,
+                        targetMessageBatches,
+                        targetDataBytes);
+                actorRuns[index] = actor.run();
+            }
+
+            return CompletableFuture.allOf(actorRuns);
+        } catch (RuntimeException exception) {
+            return CompletableFuture.failedFuture(exception);
+        }
+    }
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
deleted file mode 100644
index a36674438..000000000
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.bench.benchmarks.tcp.async;
-
-import org.apache.iggy.bench.models.cli.GlobalCliArgs;
-import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
-import org.apache.iggy.bench.models.provision.ProvisionedResources;
-import org.apache.iggy.bench.provision.ResourceProvisioner;
-
-public final class TcpAsyncPinnedProducer {
-
-    private final GlobalCliArgs globalCliArgs;
-    private final PinnedProducerCliArgs pinnedProducerCliArgs;
-    private final ResourceProvisioner resourceProvisioner;
-
-    TcpAsyncPinnedProducer(
-            GlobalCliArgs globalCliArgs,
-            PinnedProducerCliArgs pinnedProducerCliArgs,
-            ResourceProvisioner resourceProvisioner) {
-        this.globalCliArgs = globalCliArgs;
-        this.pinnedProducerCliArgs = pinnedProducerCliArgs;
-        this.resourceProvisioner = resourceProvisioner;
-    }
-
-    public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs, 
PinnedProducerCliArgs pinnedProducerCliArgs) {
-        this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner());
-    }
-
-    public ProvisionedResources provisionResources() {
-        return resourceProvisioner.provisionResources(globalCliArgs, 
pinnedProducerCliArgs);
-    }
-
-    public void run() {}
-}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
index 9d58a6fcf..8518a87ea 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
@@ -19,7 +19,7 @@
 
 package org.apache.iggy.bench.cli;
 
-import org.apache.iggy.bench.benchmarks.tcp.async.TcpAsyncPinnedProducer;
+import 
org.apache.iggy.bench.benchmarks.runners.tcp.async.TcpAsyncPinnedProducer;
 import org.apache.iggy.bench.models.cli.GlobalCliArgs;
 import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
 import org.slf4j.Logger;
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java
similarity index 68%
copy from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java
index 2a9b2def4..53bd54ea0 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java
@@ -17,15 +17,25 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
 
-public class BenchmarkException extends RuntimeException {
+public enum ActorKind {
+    PRODUCER("producer"),
+    CONSUMER("consumer"),
+    PRODUCING_CONSUMER("producing_consumer");
 
-    public BenchmarkException(String message) {
-        super(message);
+    private final String value;
+
+    ActorKind(String value) {
+        this.value = value;
+    }
+
+    public String value() {
+        return value;
     }
 
-    public BenchmarkException(String message, Throwable cause) {
-        super(message, cause);
+    @Override
+    public String toString() {
+        return value;
     }
 }
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java
similarity index 51%
copy from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java
index 2a9b2def4..d186d4820 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java
@@ -17,15 +17,30 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
 
-public class BenchmarkException extends RuntimeException {
+public enum BenchmarkKind {
+    PINNED_PRODUCER("pinned_producer"),
+    PINNED_CONSUMER("pinned_consumer"),
+    PINNED_PRODUCER_AND_CONSUMER("pinned_producer_and_consumer"),
+    BALANCED_PRODUCER("balanced_producer"),
+    BALANCED_CONSUMER_GROUP("balanced_consumer_group"),
+    
BALANCED_PRODUCER_AND_CONSUMER_GROUP("balanced_producer_and_consumer_group"),
+    END_TO_END_PRODUCING_CONSUMER("end_to_end_producing_consumer"),
+    END_TO_END_PRODUCING_CONSUMER_GROUP("end_to_end_producing_consumer_group");
 
-    public BenchmarkException(String message) {
-        super(message);
+    private final String value;
+
+    BenchmarkKind(String value) {
+        this.value = value;
+    }
+
+    public String value() {
+        return value;
     }
 
-    public BenchmarkException(String message, Throwable cause) {
-        super(message, cause);
+    @Override
+    public String toString() {
+        return value;
     }
 }
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java
similarity index 65%
copy from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java
index 2a9b2def4..fa22d198c 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java
@@ -17,15 +17,26 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
 
-public class BenchmarkException extends RuntimeException {
+public enum GroupKind {
+    PRODUCERS("producers"),
+    CONSUMERS("consumers"),
+    PRODUCERS_AND_CONSUMERS("producers_and_consumers"),
+    PRODUCING_CONSUMERS("producing_consumers");
 
-    public BenchmarkException(String message) {
-        super(message);
+    private final String value;
+
+    GroupKind(String value) {
+        this.value = value;
+    }
+
+    public String value() {
+        return value;
     }
 
-    public BenchmarkException(String message, Throwable cause) {
-        super(message, cause);
+    @Override
+    public String toString() {
+        return value;
     }
 }
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java
similarity index 71%
copy from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java
index 2a9b2def4..f71c500f1 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java
@@ -17,15 +17,24 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
 
-public class BenchmarkException extends RuntimeException {
+public enum TransportKind {
+    TCP("tcp"),
+    HTTP("http");
 
-    public BenchmarkException(String message) {
-        super(message);
+    private final String value;
+
+    TransportKind(String value) {
+        this.value = value;
+    }
+
+    public String value() {
+        return value;
     }
 
-    public BenchmarkException(String message, Throwable cause) {
-        super(message, cause);
+    @Override
+    public String toString() {
+        return value;
     }
 }
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java
similarity index 95%
rename from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
rename to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java
index 2a9b2def4..26b3c8342 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.exception;
 
 public class BenchmarkException extends RuntimeException {
 
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java
new file mode 100644
index 000000000..91ed032bb
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.common.generator;
+
+import org.apache.iggy.bench.models.common.generator.DataBatch;
+import org.apache.iggy.message.Message;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+
+public final class BenchmarkBatchGenerator {
+
+    private static final byte[] ALPHANUMERIC =
+            
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz".getBytes(StandardCharsets.US_ASCII);
+
+    private final int messagesPerBatch;
+    private final String payloadTemplate;
+
+    public BenchmarkBatchGenerator(int messageSize, int messagesPerBatch) {
+        this.messagesPerBatch = messagesPerBatch;
+        this.payloadTemplate = randomPayload(messageSize);
+    }
+
+    public DataBatch generateBatch() {
+        return buildBatch(messagesPerBatch);
+    }
+
+    public DataBatch generateBatch(int messagesInBatch) {
+        return buildBatch(messagesInBatch);
+    }
+
+    private DataBatch buildBatch(int messagesInBatch) {
+        var messages = new ArrayList<Message>(messagesInBatch);
+        long userDataBytes = 0L;
+        long totalBytes = 0L;
+
+        for (int messageIndex = 0; messageIndex < messagesInBatch; 
messageIndex++) {
+            var message = Message.of(payloadTemplate);
+            messages.add(message);
+            userDataBytes += message.payload().length;
+            totalBytes += message.getSize();
+        }
+
+        return new DataBatch(messages, userDataBytes, totalBytes);
+    }
+
+    private static String randomPayload(int messageSize) {
+        var payload = new byte[messageSize];
+        var random = ThreadLocalRandom.current();
+
+        for (int index = 0; index < messageSize; index++) {
+            payload[index] = ALPHANUMERIC[random.nextInt(ALPHANUMERIC.length)];
+        }
+
+        return new String(payload, StandardCharsets.US_ASCII);
+    }
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java
similarity index 95%
rename from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
rename to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java
index dc5c71625..717d694bc 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java
@@ -17,12 +17,12 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.provision;
+package org.apache.iggy.bench.common.provision;
 
-import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
 import org.apache.iggy.bench.models.cli.GlobalCliArgs;
 import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
-import org.apache.iggy.bench.models.provision.ProvisionedResources;
+import org.apache.iggy.bench.models.common.provision.ProvisionedResources;
 import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
 import org.apache.iggy.identifier.StreamId;
 import org.apache.iggy.topic.CompressionAlgorithm;
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
index 51e92b41a..a603e1425 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
@@ -19,7 +19,7 @@
 
 package org.apache.iggy.bench.models.cli;
 
-import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
 
 public record GlobalCliArgs(
         int messageSize,
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
index dd355a929..56ee43937 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
@@ -19,7 +19,7 @@
 
 package org.apache.iggy.bench.models.cli;
 
-import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
 
 public record PinnedProducerCliArgs(int streams, int producers, long 
maxTopicSize, long messageExpiry) {
 
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java
similarity index 81%
copy from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
copy to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java
index e5411e24e..d491c8152 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java
@@ -17,8 +17,10 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.models.provision;
+package org.apache.iggy.bench.models.common.generator;
+
+import org.apache.iggy.message.Message;
 
 import java.util.List;
 
-public record ProvisionedResources(List<String> streamNames, List<String> 
topicNames) {}
+public record DataBatch(List<Message> messages, long userDataBytes, long 
totalBytes) {}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java
similarity index 94%
rename from 
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
rename to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java
index e5411e24e..d560c1c4a 100644
--- 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iggy.bench.models.provision;
+package org.apache.iggy.bench.models.common.provision;
 
 import java.util.List;
 

Reply via email to