This is an automated email from the ASF dual-hosted git repository.
mmodzelewski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 1ae123f43 feat(java): add actor and data batch generator for java
bench (#3218)
1ae123f43 is described below
commit 1ae123f4324fc787245adf8fc5089123d45aad1a
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Sat May 16 23:33:59 2026 +0900
feat(java): add actor and data batch generator for java bench (#3218)
---
.../tcp/async/TcpAsyncPinnedProducerActor.java | 121 +++++++++++++++++++++
.../runners/tcp/async/TcpAsyncPinnedProducer.java | 94 ++++++++++++++++
.../tcp/async/TcpAsyncPinnedProducer.java | 51 ---------
.../iggy/bench/cli/PinnedProducerCommand.java | 2 +-
.../enums/ActorKind.java} | 22 +++-
.../enums/BenchmarkKind.java} | 27 ++++-
.../enums/GroupKind.java} | 23 +++-
.../enums/TransportKind.java} | 21 +++-
.../{ => common}/exception/BenchmarkException.java | 2 +-
.../common/generator/BenchmarkBatchGenerator.java | 75 +++++++++++++
.../provision/ResourceProvisioner.java | 6 +-
.../iggy/bench/models/cli/GlobalCliArgs.java | 2 +-
.../bench/models/cli/PinnedProducerCliArgs.java | 2 +-
.../generator/DataBatch.java} | 6 +-
.../provision/ProvisionedResources.java | 2 +-
15 files changed, 371 insertions(+), 85 deletions(-)
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java
new file mode 100644
index 000000000..29adfb983
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/actors/tcp/async/TcpAsyncPinnedProducerActor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.benchmarks.actors.tcp.async;
+
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.common.generator.DataBatch;
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Partitioning;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+
+public final class TcpAsyncPinnedProducerActor {
+
+ private static final long PARTITION_ID = 0L;
+
+ private final GlobalCliArgs globalCliArgs;
+ private final int actorId;
+ private final StreamId streamId;
+ private final TopicId topicId;
+ private final Partitioning partitioning;
+ private final DataBatch fullBatch;
+ private final long targetMessageBatches;
+ private final long targetDataBytes;
+ private AsyncIggyTcpClient client;
+
+ public TcpAsyncPinnedProducerActor(
+ GlobalCliArgs globalCliArgs,
+ int actorId,
+ String streamName,
+ String topicName,
+ DataBatch fullBatch,
+ long targetMessageBatches,
+ long targetDataBytes) {
+ this.globalCliArgs = globalCliArgs;
+ this.actorId = actorId;
+ this.streamId = StreamId.of(streamName);
+ this.topicId = TopicId.of(topicName);
+ this.partitioning = Partitioning.partitionId(PARTITION_ID);
+ this.fullBatch = fullBatch;
+ this.targetMessageBatches = targetMessageBatches;
+ this.targetDataBytes = targetDataBytes;
+ }
+
+ public CompletableFuture<Void> run() {
+ try {
+ return AsyncIggyTcpClient.builder()
+ .credentials(globalCliArgs.username(),
globalCliArgs.password())
+ .buildAndLogin()
+ .thenCompose(client -> {
+ this.client = client;
+ long warmupDeadline = globalCliArgs.warmupTimeMs() <=
0L
+ ? 0L
+ : System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(globalCliArgs.warmupTimeMs());
+
+ return sendMessages(warmupDeadline, 0L, 0L)
+ .handle((ignored, sendFailure) -> {
+ CompletableFuture<Void> closeFuture =
client.close();
+
+ if (sendFailure != null) {
+ CompletableFuture<Void>
failedAfterClose =
+
closeFuture.handle((closeIgnored, closeFailure) -> {
+ throw new
CompletionException(sendFailure);
+ });
+ return failedAfterClose;
+ }
+
+ return closeFuture;
+ })
+ .thenCompose(future -> future);
+ });
+ } catch (RuntimeException exception) {
+ return CompletableFuture.failedFuture(exception);
+ }
+ }
+
+ private CompletableFuture<Void> sendMessages(long warmupDeadline, long
sentBatches, long sentBytes) {
+ if (warmupDeadline > 0L) {
+ if (System.nanoTime() >= warmupDeadline) {
+ return sendMessages(0L, 0L, 0L);
+ }
+
+ return sendBatch(fullBatch, false)
+ .thenCompose(ignored -> sendMessages(warmupDeadline,
sentBatches, sentBytes));
+ }
+
+ if (targetDataBytes > 0L ? sentBytes >= targetDataBytes : sentBatches
>= targetMessageBatches) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return sendBatch(fullBatch, true).thenCompose(ignored -> {
+ long updatedSentBatches = sentBatches + 1L;
+ long updatedSentBytes = sentBytes + fullBatch.userDataBytes();
+ return sendMessages(0L, updatedSentBatches, updatedSentBytes);
+ });
+ }
+
+ private CompletableFuture<Void> sendBatch(DataBatch currentBatch, boolean
recordMetrics) {
+ return client.messages().sendMessages(streamId, topicId, partitioning,
currentBatch.messages());
+ }
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java
new file mode 100644
index 000000000..bfe9ee882
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/runners/tcp/async/TcpAsyncPinnedProducer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.benchmarks.runners.tcp.async;
+
+import
org.apache.iggy.bench.benchmarks.actors.tcp.async.TcpAsyncPinnedProducerActor;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
+import org.apache.iggy.bench.common.generator.BenchmarkBatchGenerator;
+import org.apache.iggy.bench.common.provision.ResourceProvisioner;
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.apache.iggy.bench.models.common.generator.DataBatch;
+import org.apache.iggy.bench.models.common.provision.ProvisionedResources;
+
+import java.util.concurrent.CompletableFuture;
+
+public final class TcpAsyncPinnedProducer {
+
+ private final GlobalCliArgs globalCliArgs;
+ private final PinnedProducerCliArgs pinnedProducerCliArgs;
+ private final ResourceProvisioner resourceProvisioner;
+ private ProvisionedResources provisionedResources;
+
+ TcpAsyncPinnedProducer(
+ GlobalCliArgs globalCliArgs,
+ PinnedProducerCliArgs pinnedProducerCliArgs,
+ ResourceProvisioner resourceProvisioner) {
+ this.globalCliArgs = globalCliArgs;
+ this.pinnedProducerCliArgs = pinnedProducerCliArgs;
+ this.resourceProvisioner = resourceProvisioner;
+ }
+
+ public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs,
PinnedProducerCliArgs pinnedProducerCliArgs) {
+ this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner());
+ }
+
+ public void provisionResources() {
+ this.provisionedResources =
resourceProvisioner.provisionResources(globalCliArgs, pinnedProducerCliArgs);
+ }
+
+ public void run() {
+ runBenchmark().join();
+ }
+
+ private CompletableFuture<Void> runBenchmark() {
+ try {
+ if (provisionedResources == null) {
+ throw new BenchmarkException("Benchmark resources must be
provisioned before running.");
+ }
+
+ String topicName = provisionedResources.topicNames().get(0);
+ var batchGenerator =
+ new BenchmarkBatchGenerator(globalCliArgs.messageSize(),
globalCliArgs.messagesPerBatch());
+ DataBatch fullBatch = batchGenerator.generateBatch();
+ long targetMessageBatches = globalCliArgs.totalData() > 0L ? 0L :
globalCliArgs.messageBatches();
+ long targetDataBytes =
+ globalCliArgs.totalData() > 0L ? globalCliArgs.totalData()
/ pinnedProducerCliArgs.producers() : 0L;
+ var actorRuns = new
CompletableFuture<?>[pinnedProducerCliArgs.producers()];
+
+ for (int index = 0; index < pinnedProducerCliArgs.producers();
index++) {
+ String streamName =
provisionedResources.streamNames().get(index);
+ var actor = new TcpAsyncPinnedProducerActor(
+ globalCliArgs,
+ index + 1,
+ streamName,
+ topicName,
+ fullBatch,
+ targetMessageBatches,
+ targetDataBytes);
+ actorRuns[index] = actor.run();
+ }
+
+ return CompletableFuture.allOf(actorRuns);
+ } catch (RuntimeException exception) {
+ return CompletableFuture.failedFuture(exception);
+ }
+ }
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
deleted file mode 100644
index a36674438..000000000
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.bench.benchmarks.tcp.async;
-
-import org.apache.iggy.bench.models.cli.GlobalCliArgs;
-import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
-import org.apache.iggy.bench.models.provision.ProvisionedResources;
-import org.apache.iggy.bench.provision.ResourceProvisioner;
-
-public final class TcpAsyncPinnedProducer {
-
- private final GlobalCliArgs globalCliArgs;
- private final PinnedProducerCliArgs pinnedProducerCliArgs;
- private final ResourceProvisioner resourceProvisioner;
-
- TcpAsyncPinnedProducer(
- GlobalCliArgs globalCliArgs,
- PinnedProducerCliArgs pinnedProducerCliArgs,
- ResourceProvisioner resourceProvisioner) {
- this.globalCliArgs = globalCliArgs;
- this.pinnedProducerCliArgs = pinnedProducerCliArgs;
- this.resourceProvisioner = resourceProvisioner;
- }
-
- public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs,
PinnedProducerCliArgs pinnedProducerCliArgs) {
- this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner());
- }
-
- public ProvisionedResources provisionResources() {
- return resourceProvisioner.provisionResources(globalCliArgs,
pinnedProducerCliArgs);
- }
-
- public void run() {}
-}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
index 9d58a6fcf..8518a87ea 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
@@ -19,7 +19,7 @@
package org.apache.iggy.bench.cli;
-import org.apache.iggy.bench.benchmarks.tcp.async.TcpAsyncPinnedProducer;
+import
org.apache.iggy.bench.benchmarks.runners.tcp.async.TcpAsyncPinnedProducer;
import org.apache.iggy.bench.models.cli.GlobalCliArgs;
import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
import org.slf4j.Logger;
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java
similarity index 68%
copy from
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java
index 2a9b2def4..53bd54ea0 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/ActorKind.java
@@ -17,15 +17,25 @@
* under the License.
*/
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
-public class BenchmarkException extends RuntimeException {
+public enum ActorKind {
+ PRODUCER("producer"),
+ CONSUMER("consumer"),
+ PRODUCING_CONSUMER("producing_consumer");
- public BenchmarkException(String message) {
- super(message);
+ private final String value;
+
+ ActorKind(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
}
- public BenchmarkException(String message, Throwable cause) {
- super(message, cause);
+ @Override
+ public String toString() {
+ return value;
}
}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java
similarity index 51%
copy from
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java
index 2a9b2def4..d186d4820 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/BenchmarkKind.java
@@ -17,15 +17,30 @@
* under the License.
*/
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
-public class BenchmarkException extends RuntimeException {
+public enum BenchmarkKind {
+ PINNED_PRODUCER("pinned_producer"),
+ PINNED_CONSUMER("pinned_consumer"),
+ PINNED_PRODUCER_AND_CONSUMER("pinned_producer_and_consumer"),
+ BALANCED_PRODUCER("balanced_producer"),
+ BALANCED_CONSUMER_GROUP("balanced_consumer_group"),
+
BALANCED_PRODUCER_AND_CONSUMER_GROUP("balanced_producer_and_consumer_group"),
+ END_TO_END_PRODUCING_CONSUMER("end_to_end_producing_consumer"),
+ END_TO_END_PRODUCING_CONSUMER_GROUP("end_to_end_producing_consumer_group");
- public BenchmarkException(String message) {
- super(message);
+ private final String value;
+
+ BenchmarkKind(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
}
- public BenchmarkException(String message, Throwable cause) {
- super(message, cause);
+ @Override
+ public String toString() {
+ return value;
}
}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java
similarity index 65%
copy from
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java
index 2a9b2def4..fa22d198c 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/GroupKind.java
@@ -17,15 +17,26 @@
* under the License.
*/
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
-public class BenchmarkException extends RuntimeException {
+public enum GroupKind {
+ PRODUCERS("producers"),
+ CONSUMERS("consumers"),
+ PRODUCERS_AND_CONSUMERS("producers_and_consumers"),
+ PRODUCING_CONSUMERS("producing_consumers");
- public BenchmarkException(String message) {
- super(message);
+ private final String value;
+
+ GroupKind(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
}
- public BenchmarkException(String message, Throwable cause) {
- super(message, cause);
+ @Override
+ public String toString() {
+ return value;
}
}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java
similarity index 71%
copy from
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
copy to
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java
index 2a9b2def4..f71c500f1 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/enums/TransportKind.java
@@ -17,15 +17,24 @@
* under the License.
*/
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.enums;
-public class BenchmarkException extends RuntimeException {
+public enum TransportKind {
+ TCP("tcp"),
+ HTTP("http");
- public BenchmarkException(String message) {
- super(message);
+ private final String value;
+
+ TransportKind(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
}
- public BenchmarkException(String message, Throwable cause) {
- super(message, cause);
+ @Override
+ public String toString() {
+ return value;
}
}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java
similarity index 95%
rename from
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
rename to
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java
index 2a9b2def4..26b3c8342 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/exception/BenchmarkException.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iggy.bench.exception;
+package org.apache.iggy.bench.common.exception;
public class BenchmarkException extends RuntimeException {
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java
new file mode 100644
index 000000000..91ed032bb
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/generator/BenchmarkBatchGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.common.generator;
+
+import org.apache.iggy.bench.models.common.generator.DataBatch;
+import org.apache.iggy.message.Message;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+
+public final class BenchmarkBatchGenerator {
+
+ private static final byte[] ALPHANUMERIC =
+
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz".getBytes(StandardCharsets.US_ASCII);
+
+ private final int messagesPerBatch;
+ private final String payloadTemplate;
+
+ public BenchmarkBatchGenerator(int messageSize, int messagesPerBatch) {
+ this.messagesPerBatch = messagesPerBatch;
+ this.payloadTemplate = randomPayload(messageSize);
+ }
+
+ public DataBatch generateBatch() {
+ return buildBatch(messagesPerBatch);
+ }
+
+ public DataBatch generateBatch(int messagesInBatch) {
+ return buildBatch(messagesInBatch);
+ }
+
+ private DataBatch buildBatch(int messagesInBatch) {
+ var messages = new ArrayList<Message>(messagesInBatch);
+ long userDataBytes = 0L;
+ long totalBytes = 0L;
+
+ for (int messageIndex = 0; messageIndex < messagesInBatch;
messageIndex++) {
+ var message = Message.of(payloadTemplate);
+ messages.add(message);
+ userDataBytes += message.payload().length;
+ totalBytes += message.getSize();
+ }
+
+ return new DataBatch(messages, userDataBytes, totalBytes);
+ }
+
+ private static String randomPayload(int messageSize) {
+ var payload = new byte[messageSize];
+ var random = ThreadLocalRandom.current();
+
+ for (int index = 0; index < messageSize; index++) {
+ payload[index] = ALPHANUMERIC[random.nextInt(ALPHANUMERIC.length)];
+ }
+
+ return new String(payload, StandardCharsets.US_ASCII);
+ }
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java
similarity index 95%
rename from
foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
rename to
foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java
index dc5c71625..717d694bc 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/common/provision/ResourceProvisioner.java
@@ -17,12 +17,12 @@
* under the License.
*/
-package org.apache.iggy.bench.provision;
+package org.apache.iggy.bench.common.provision;
-import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
import org.apache.iggy.bench.models.cli.GlobalCliArgs;
import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
-import org.apache.iggy.bench.models.provision.ProvisionedResources;
+import org.apache.iggy.bench.models.common.provision.ProvisionedResources;
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.topic.CompressionAlgorithm;
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
index 51e92b41a..a603e1425 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
@@ -19,7 +19,7 @@
package org.apache.iggy.bench.models.cli;
-import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
public record GlobalCliArgs(
int messageSize,
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
index dd355a929..56ee43937 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
@@ -19,7 +19,7 @@
package org.apache.iggy.bench.models.cli;
-import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.common.exception.BenchmarkException;
public record PinnedProducerCliArgs(int streams, int producers, long
maxTopicSize, long messageExpiry) {
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java
similarity index 81%
copy from
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
copy to
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java
index e5411e24e..d491c8152 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/generator/DataBatch.java
@@ -17,8 +17,10 @@
* under the License.
*/
-package org.apache.iggy.bench.models.provision;
+package org.apache.iggy.bench.models.common.generator;
+
+import org.apache.iggy.message.Message;
import java.util.List;
-public record ProvisionedResources(List<String> streamNames, List<String>
topicNames) {}
+public record DataBatch(List<Message> messages, long userDataBytes, long
totalBytes) {}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java
similarity index 94%
rename from
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
rename to
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java
index e5411e24e..d560c1c4a 100644
---
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/common/provision/ProvisionedResources.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iggy.bench.models.provision;
+package org.apache.iggy.bench.models.common.provision;
import java.util.List;