This is an automated email from the ASF dual-hosted git repository.
mmodzelewski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new eb20ac5ae feat(java): Implement CLI and resource provisioner for
pinned producer benchmark (#3159)
eb20ac5ae is described below
commit eb20ac5ae723a2e11b4989342cccfb69706141a3
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Wed Apr 29 02:36:00 2026 +0900
feat(java): Implement CLI and resource provisioner for pinned producer
benchmark (#3159)
---
.../build.gradle.kts} | 27 ++--
.../java/org/apache/iggy/bench/IggyBench.java} | 20 +--
.../tcp/async/TcpAsyncPinnedProducer.java | 51 +++++++
.../apache/iggy/bench/cli/IggyBenchCommand.java | 162 +++++++++++++++++++++
.../iggy/bench/cli/PinnedProducerCommand.java | 111 ++++++++++++++
.../iggy/bench/exception/BenchmarkException.java} | 20 ++-
.../iggy/bench/models/cli/GlobalCliArgs.java | 85 +++++++++++
.../bench/models/cli/PinnedProducerCliArgs.java | 43 ++++++
.../models/provision/ProvisionedResources.java} | 15 +-
.../iggy/bench/provision/ResourceProvisioner.java | 100 +++++++++++++
foreign/java/gradle/libs.versions.toml | 2 +
foreign/java/settings.gradle.kts | 3 +
12 files changed, 596 insertions(+), 43 deletions(-)
diff --git a/foreign/java/settings.gradle.kts
b/foreign/java/bench/build.gradle.kts
similarity index 57%
copy from foreign/java/settings.gradle.kts
copy to foreign/java/bench/build.gradle.kts
index 477beb85d..afa8362ee 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/bench/build.gradle.kts
@@ -17,17 +17,24 @@
* under the License.
*/
-rootProject.name = "iggy-java-sdk"
+plugins {
+ id("iggy.java-application-conventions")
+}
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+application {
+ mainClass = "org.apache.iggy.bench.IggyBench"
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir =
file("external-processors/iggy-connector-flink/iggy-connector-library")
+ // -Xms2g starts the JVM heap at 2 GB.
+ // -Xmx2g caps the JVM heap at 2 GB.
+ // -XX:+UseG1GC pins the garbage collector across runs.
+ // -XX:+AlwaysPreTouch commits heap pages up front to reduce benchmark
jitter.
+ applicationDefaultJvmArgs = listOf("-Xms2g", "-Xmx2g", "-XX:+UseG1GC",
"-XX:+AlwaysPreTouch")
+}
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir =
file("external-processors/iggy-connector-flink/iggy-flink-examples")
+dependencies {
+ implementation(project(":iggy"))
+ implementation(libs.picocli)
+ implementation(libs.slf4j.api)
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir =
file("external-processors/iggy-connector-pinot")
+ runtimeOnly(libs.logback.classic)
+}
diff --git a/foreign/java/settings.gradle.kts
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/IggyBench.java
similarity index 58%
copy from foreign/java/settings.gradle.kts
copy to foreign/java/bench/src/main/java/org/apache/iggy/bench/IggyBench.java
index 477beb85d..6ea2fe2a9 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/IggyBench.java
@@ -17,17 +17,17 @@
* under the License.
*/
-rootProject.name = "iggy-java-sdk"
+package org.apache.iggy.bench;
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+import org.apache.iggy.bench.cli.IggyBenchCommand;
+import picocli.CommandLine;
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir =
file("external-processors/iggy-connector-flink/iggy-connector-library")
+public final class IggyBench {
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir =
file("external-processors/iggy-connector-flink/iggy-flink-examples")
+ private IggyBench() {}
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir =
file("external-processors/iggy-connector-pinot")
+ public static void main(String[] args) {
+ var exitCode = new CommandLine(new IggyBenchCommand()).execute(args);
+ System.exit(exitCode);
+ }
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
new file mode 100644
index 000000000..a36674438
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.benchmarks.tcp.async;
+
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.apache.iggy.bench.models.provision.ProvisionedResources;
+import org.apache.iggy.bench.provision.ResourceProvisioner;
+
+public final class TcpAsyncPinnedProducer {
+
+ private final GlobalCliArgs globalCliArgs;
+ private final PinnedProducerCliArgs pinnedProducerCliArgs;
+ private final ResourceProvisioner resourceProvisioner;
+
+ TcpAsyncPinnedProducer(
+ GlobalCliArgs globalCliArgs,
+ PinnedProducerCliArgs pinnedProducerCliArgs,
+ ResourceProvisioner resourceProvisioner) {
+ this.globalCliArgs = globalCliArgs;
+ this.pinnedProducerCliArgs = pinnedProducerCliArgs;
+ this.resourceProvisioner = resourceProvisioner;
+ }
+
+ public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs,
PinnedProducerCliArgs pinnedProducerCliArgs) {
+ this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner());
+ }
+
+ public ProvisionedResources provisionResources() {
+ return resourceProvisioner.provisionResources(globalCliArgs,
pinnedProducerCliArgs);
+ }
+
+ public void run() {}
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/IggyBenchCommand.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/IggyBenchCommand.java
new file mode 100644
index 000000000..99a937d53
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/IggyBenchCommand.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.cli;
+
+import picocli.CommandLine.ArgGroup;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ExitCode;
+import picocli.CommandLine.Model.CommandSpec;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Spec;
+
+import java.util.concurrent.Callable;
+
+@Command(
+ name = "iggy-bench",
+ mixinStandardHelpOptions = true,
+ subcommands = {PinnedProducerCommand.class},
+ description = "Benchmark CLI for the Apache Iggy Java SDK.")
+public final class IggyBenchCommand implements Callable<Integer> {
+ @Option(
+ names = {"--message-size", "-m"},
+ defaultValue = "1000",
+ description = "Message size in bytes.")
+ private int messageSize;
+
+ @Option(
+ names = {"--messages-per-batch", "-P"},
+ defaultValue = "1000",
+ description = "Messages per batch.")
+ private int messagesPerBatch;
+
+ @ArgGroup(exclusive = true, multiplicity = "0..1")
+ private DataLimitOptions dataLimitOptions;
+
+ @Option(
+ names = {"--rate-limit", "-r"},
+ defaultValue = "0",
+ description = "(NOT USED CURRENTLY) Optional total rate limit in
bytes per second.")
+ private long rateLimit;
+
+ @Option(
+ names = {"--warmup-time", "-w"},
+ defaultValue = "20000",
+ description = "Warmup time in milliseconds.")
+ private long warmupTimeMs;
+
+ @Option(
+ names = {"--sampling-time", "-t"},
+ defaultValue = "10",
+ description = "Sampling time in milliseconds.")
+ private long samplingTimeMs;
+
+ @Option(
+ names = {"--moving-average-window", "-W"},
+ defaultValue = "20",
+ description = "Moving average window size.")
+ private int movingAverageWindow;
+
+ @Option(
+ names = {"--username", "-u"},
+ defaultValue = "iggy",
+ description = "Server username.")
+ private String username;
+
+ @Option(
+ names = {"--password", "-p"},
+ defaultValue = "iggy",
+ description = "Server password.")
+ private String password;
+
+ @Option(names = "--reuse-streams", defaultValue = "false", description =
"Reuse existing benchmark streams.")
+ private boolean reuseStreams;
+
+ @Spec
+ private CommandSpec spec;
+
+ @Override
+ public Integer call() {
+ spec.commandLine().usage(spec.commandLine().getOut());
+ return ExitCode.USAGE;
+ }
+
+ public int messageSize() {
+ return messageSize;
+ }
+
+ public int messagesPerBatch() {
+ return messagesPerBatch;
+ }
+
+ public int messageBatches() {
+ if (dataLimitOptions == null || dataLimitOptions.messageBatches ==
null) {
+ return 1000;
+ }
+ return dataLimitOptions.messageBatches;
+ }
+
+ public long totalData() {
+ if (dataLimitOptions == null || dataLimitOptions.totalData == null) {
+ return 0L;
+ }
+ return dataLimitOptions.totalData;
+ }
+
+ public long rateLimit() {
+ return rateLimit;
+ }
+
+ public long warmupTimeMs() {
+ return warmupTimeMs;
+ }
+
+ public long samplingTimeMs() {
+ return samplingTimeMs;
+ }
+
+ public int movingAverageWindow() {
+ return movingAverageWindow;
+ }
+
+ public String username() {
+ return username;
+ }
+
+ public String password() {
+ return password;
+ }
+
+ public boolean reuseStreams() {
+ return reuseStreams;
+ }
+
+ private static final class DataLimitOptions {
+
+ @Option(
+ names = {"--message-batches", "-b"},
+ description = "Number of message batches. Defaults to 1000
when --total-data is not specified.")
+ private Integer messageBatches;
+
+ @Option(
+ names = {"--total-data", "-T"},
+ description = "Total data volume in bytes.")
+ private Long totalData;
+ }
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
new file mode 100644
index 000000000..9d58a6fcf
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.cli;
+
+import org.apache.iggy.bench.benchmarks.tcp.async.TcpAsyncPinnedProducer;
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ExitCode;
+import picocli.CommandLine.Model.CommandSpec;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.ParentCommand;
+import picocli.CommandLine.Spec;
+
+import java.util.concurrent.Callable;
+
+@Command(
+ name = "pinned-producer",
+ aliases = {"pp"},
+ mixinStandardHelpOptions = true,
+ description = "Pinned producer benchmark.")
+public final class PinnedProducerCommand implements Callable<Integer> {
+
+ private static final Logger log =
LoggerFactory.getLogger(PinnedProducerCommand.class);
+ private static final long DEFAULT_MAX_TOPIC_SIZE = 0L;
+ private static final long DEFAULT_MESSAGE_EXPIRY = 0L;
+
+ @ParentCommand
+ private IggyBenchCommand rootCommand;
+
+ @Spec
+ private CommandSpec spec;
+
+ @Option(
+ names = {"--streams", "-s"},
+ description = "Number of streams. Defaults to the number of
producers.")
+ private Integer streams;
+
+ @Option(
+ names = {"--producers", "-p"},
+ defaultValue = "8",
+ description = "Number of producers.")
+ private int producers = 8;
+
+ @Option(
+ names = {"--max-topic-size", "-T"},
+ defaultValue = "0",
+ description = "Max topic size in bytes. Use 0 for the server
default.")
+ private long maxTopicSize = DEFAULT_MAX_TOPIC_SIZE;
+
+ @Option(
+ names = {"--message-expiry", "-e"},
+ defaultValue = "0",
+ description = "Topic message expiry in microseconds. Use 0 to
never expire.")
+ private long messageExpiry = DEFAULT_MESSAGE_EXPIRY;
+
+ @Override
+ public Integer call() {
+ try {
+ var messageBatches = rootCommand.totalData() > 0 ? 0 :
rootCommand.messageBatches();
+ var globalCliArgs = new GlobalCliArgs(
+ rootCommand.messageSize(),
+ rootCommand.messagesPerBatch(),
+ messageBatches,
+ rootCommand.totalData(),
+ rootCommand.rateLimit(),
+ rootCommand.warmupTimeMs(),
+ rootCommand.samplingTimeMs(),
+ rootCommand.movingAverageWindow(),
+ rootCommand.username(),
+ rootCommand.password(),
+ rootCommand.reuseStreams());
+
+ var pinnedProducerCliArgs = new PinnedProducerCliArgs(
+ streams != null ? streams : producers, producers,
maxTopicSize, messageExpiry);
+
+ globalCliArgs.validate();
+ pinnedProducerCliArgs.validate();
+
+ log.info("Starting the Pinned Producer benchmark...");
+ var benchmark = new TcpAsyncPinnedProducer(globalCliArgs,
pinnedProducerCliArgs);
+ benchmark.provisionResources();
+ benchmark.run();
+
+ return ExitCode.OK;
+ } catch (RuntimeException exception) {
+ var message = exception.getMessage() != null ?
exception.getMessage() : exception.toString();
+ spec.commandLine().getErr().println(message);
+ return ExitCode.SOFTWARE;
+ }
+ }
+}
diff --git a/foreign/java/settings.gradle.kts
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
similarity index 58%
copy from foreign/java/settings.gradle.kts
copy to
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
index 477beb85d..2a9b2def4 100644
--- a/foreign/java/settings.gradle.kts
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
@@ -17,17 +17,15 @@
* under the License.
*/
-rootProject.name = "iggy-java-sdk"
+package org.apache.iggy.bench.exception;
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+public class BenchmarkException extends RuntimeException {
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir =
file("external-processors/iggy-connector-flink/iggy-connector-library")
+ public BenchmarkException(String message) {
+ super(message);
+ }
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir =
file("external-processors/iggy-connector-flink/iggy-flink-examples")
-
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir =
file("external-processors/iggy-connector-pinot")
+ public BenchmarkException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
new file mode 100644
index 000000000..51e92b41a
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.models.cli;
+
+import org.apache.iggy.bench.exception.BenchmarkException;
+
+public record GlobalCliArgs(
+ int messageSize,
+ int messagesPerBatch,
+ int messageBatches,
+ long totalData,
+ long rateLimit,
+ long warmupTimeMs,
+ long samplingTimeMs,
+ int movingAverageWindow,
+ String username,
+ String password,
+ boolean reuseStreams) {
+
+ public void validate() {
+ validateCredentials();
+ validateMessageSettings();
+ validateTimingSettings();
+ validateDataSettings();
+ }
+
+ private void validateCredentials() {
+ if (username == null || username.isBlank()) {
+ throw new BenchmarkException("--username cannot be blank.");
+ }
+ if (password == null || password.isBlank()) {
+ throw new BenchmarkException("--password cannot be blank.");
+ }
+ }
+
+ private void validateMessageSettings() {
+ if (messageSize <= 0) {
+ throw new BenchmarkException("--message-size must be greater than
0.");
+ }
+ if (messagesPerBatch <= 0) {
+ throw new BenchmarkException("--messages-per-batch must be greater
than 0.");
+ }
+ if (rateLimit < 0) {
+ throw new BenchmarkException("--rate-limit must be greater than or
equal to 0.");
+ }
+ }
+
+ private void validateTimingSettings() {
+ if (warmupTimeMs < 0) {
+ throw new BenchmarkException("--warmup-time must be greater than
or equal to 0.");
+ }
+ if (samplingTimeMs <= 0) {
+ throw new BenchmarkException("--sampling-time must be greater than
0.");
+ }
+ if (movingAverageWindow <= 0) {
+ throw new BenchmarkException("--moving-average-window must be
greater than 0.");
+ }
+ }
+
+ private void validateDataSettings() {
+ if (totalData < 0) {
+ throw new BenchmarkException("--total-data must be greater than or
equal to 0.");
+ }
+ if (totalData == 0 && messageBatches <= 0) {
+ throw new BenchmarkException("--message-batches must be greater
than 0.");
+ }
+ }
+}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
new file mode 100644
index 000000000..dd355a929
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.models.cli;
+
+import org.apache.iggy.bench.exception.BenchmarkException;
+
+public record PinnedProducerCliArgs(int streams, int producers, long
maxTopicSize, long messageExpiry) {
+
+ public void validate() {
+ if (producers <= 0) {
+ throw new BenchmarkException("--producers must be greater than
0.");
+ }
+ if (streams <= 0) {
+ throw new BenchmarkException("--streams must be greater than 0.");
+ }
+ if (streams != producers) {
+ throw new BenchmarkException("For pinned producer, --streams must
match --producers.");
+ }
+ if (maxTopicSize < 0) {
+ throw new BenchmarkException("--max-topic-size must be greater
than or equal to 0.");
+ }
+ if (messageExpiry < 0L) {
+ throw new BenchmarkException("--message-expiry must be greater
than or equal to 0.");
+ }
+ }
+}
diff --git a/foreign/java/settings.gradle.kts
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
similarity index 58%
copy from foreign/java/settings.gradle.kts
copy to
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
index 477beb85d..e5411e24e 100644
--- a/foreign/java/settings.gradle.kts
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
@@ -17,17 +17,8 @@
* under the License.
*/
-rootProject.name = "iggy-java-sdk"
+package org.apache.iggy.bench.models.provision;
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+import java.util.List;
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir =
file("external-processors/iggy-connector-flink/iggy-connector-library")
-
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir =
file("external-processors/iggy-connector-flink/iggy-flink-examples")
-
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir =
file("external-processors/iggy-connector-pinot")
+public record ProvisionedResources(List<String> streamNames, List<String>
topicNames) {}
diff --git
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
new file mode 100644
index 000000000..dc5c71625
--- /dev/null
+++
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.provision;
+
+import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.apache.iggy.bench.models.provision.ProvisionedResources;
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public final class ResourceProvisioner {
+
+ private static final Logger log =
LoggerFactory.getLogger(ResourceProvisioner.class);
+
+ public ProvisionedResources provisionResources(
+ GlobalCliArgs globalCliArgs, PinnedProducerCliArgs
pinnedProducerCliArgs) {
+ var streamNames = new ArrayList<String>();
+ var topicNames = List.of("topic-1");
+
+ try (var client = IggyTcpClient.builder()
+ .credentials(globalCliArgs.username(),
globalCliArgs.password())
+ .buildAndLogin()) {
+ var existingStreamNames = client.streams().getStreams().stream()
+ .map(stream -> stream.name())
+ .toList();
+
+ for (var i = 1; i <= pinnedProducerCliArgs.producers(); i++) {
+ var streamName = "bench-stream-" + i;
+ streamNames.add(streamName);
+
+ if (existingStreamNames.contains(streamName) &&
!globalCliArgs.reuseStreams()) {
+ log.info("Deleting pre-existing stream '{}'", streamName);
+ client.streams().deleteStream(StreamId.of(streamName));
+ }
+
+ if (existingStreamNames.contains(streamName) &&
globalCliArgs.reuseStreams()) {
+ log.info("Appending to existing stream '{}'", streamName);
+ } else {
+ log.info("Creating the test stream '{}'", streamName);
+
+ client.streams().createStream(streamName);
+
+ var maxTopicSize = pinnedProducerCliArgs.maxTopicSize() ==
0L
+ ? "server default"
+ :
Long.toString(pinnedProducerCliArgs.maxTopicSize());
+ var messageExpiry = pinnedProducerCliArgs.messageExpiry()
== 0L
+ ? "never"
+ :
Long.toString(pinnedProducerCliArgs.messageExpiry());
+
+ log.info(
+ "Creating the test topic '{}' for stream '{}' with
max topic size: {}, message expiry: {}",
+ topicNames.get(0),
+ streamName,
+ maxTopicSize,
+ messageExpiry);
+
+ client.topics()
+ .createTopic(
+ StreamId.of(streamName),
+ 1L,
+ CompressionAlgorithm.None,
+
BigInteger.valueOf(pinnedProducerCliArgs.messageExpiry()),
+
BigInteger.valueOf(pinnedProducerCliArgs.maxTopicSize()),
+ Optional.empty(),
+ topicNames.get(0));
+ }
+ }
+ } catch (RuntimeException exception) {
+ throw new BenchmarkException("Failed to provision benchmark
resources.", exception);
+ }
+
+ return new ProvisionedResources(streamNames, topicNames);
+ }
+}
diff --git a/foreign/java/gradle/libs.versions.toml
b/foreign/java/gradle/libs.versions.toml
index 9222be7a9..24f4acba6 100644
--- a/foreign/java/gradle/libs.versions.toml
+++ b/foreign/java/gradle/libs.versions.toml
@@ -49,6 +49,7 @@ spotbugs = "4.9.8"
# Config
typesafe-config = "1.4.5"
+picocli = "4.7.7"
# Build plugins
spotless = "8.1.0"
@@ -102,6 +103,7 @@ spotbugs-annotations = { module =
"com.github.spotbugs:spotbugs-annotations", ve
# Config
typesafe-config = { module = "com.typesafe:config", version.ref =
"typesafe-config" }
+picocli = { module = "info.picocli:picocli", version.ref = "picocli" }
[bundles]
testing = ["junit-jupiter", "junit-platform-launcher", "assertj-core"]
diff --git a/foreign/java/settings.gradle.kts b/foreign/java/settings.gradle.kts
index 477beb85d..3f256e805 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/settings.gradle.kts
@@ -22,6 +22,9 @@ rootProject.name = "iggy-java-sdk"
include("iggy")
project(":iggy").projectDir = file("java-sdk")
+include("iggy-bench")
+project(":iggy-bench").projectDir = file("bench")
+
// External processors - Stream processing integrations
include("iggy-connector-library")
project(":iggy-connector-library").projectDir =
file("external-processors/iggy-connector-flink/iggy-connector-library")