This is an automated email from the ASF dual-hosted git repository.

mmodzelewski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new eb20ac5ae feat(java): Implement CLI and resource provisioner for 
pinned producer benchmark (#3159)
eb20ac5ae is described below

commit eb20ac5ae723a2e11b4989342cccfb69706141a3
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Wed Apr 29 02:36:00 2026 +0900

    feat(java): Implement CLI and resource provisioner for pinned producer 
benchmark (#3159)
---
 .../build.gradle.kts}                              |  27 ++--
 .../java/org/apache/iggy/bench/IggyBench.java}     |  20 +--
 .../tcp/async/TcpAsyncPinnedProducer.java          |  51 +++++++
 .../apache/iggy/bench/cli/IggyBenchCommand.java    | 162 +++++++++++++++++++++
 .../iggy/bench/cli/PinnedProducerCommand.java      | 111 ++++++++++++++
 .../iggy/bench/exception/BenchmarkException.java}  |  20 ++-
 .../iggy/bench/models/cli/GlobalCliArgs.java       |  85 +++++++++++
 .../bench/models/cli/PinnedProducerCliArgs.java    |  43 ++++++
 .../models/provision/ProvisionedResources.java}    |  15 +-
 .../iggy/bench/provision/ResourceProvisioner.java  | 100 +++++++++++++
 foreign/java/gradle/libs.versions.toml             |   2 +
 foreign/java/settings.gradle.kts                   |   3 +
 12 files changed, 596 insertions(+), 43 deletions(-)

diff --git a/foreign/java/settings.gradle.kts 
b/foreign/java/bench/build.gradle.kts
similarity index 57%
copy from foreign/java/settings.gradle.kts
copy to foreign/java/bench/build.gradle.kts
index 477beb85d..afa8362ee 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/bench/build.gradle.kts
@@ -17,17 +17,24 @@
  * under the License.
  */
 
-rootProject.name = "iggy-java-sdk"
+plugins {
+    id("iggy.java-application-conventions")
+}
 
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+application {
+    mainClass = "org.apache.iggy.bench.IggyBench"
 
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir = 
file("external-processors/iggy-connector-flink/iggy-connector-library")
+    // -Xms2g starts the JVM heap at 2 GB.
+    // -Xmx2g caps the JVM heap at 2 GB.
+    // -XX:+UseG1GC pins the garbage collector across runs.
+    // -XX:+AlwaysPreTouch commits heap pages up front to reduce benchmark 
jitter.
+    applicationDefaultJvmArgs = listOf("-Xms2g", "-Xmx2g", "-XX:+UseG1GC", 
"-XX:+AlwaysPreTouch")
+}
 
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir = 
file("external-processors/iggy-connector-flink/iggy-flink-examples")
+dependencies {
+    implementation(project(":iggy"))
+    implementation(libs.picocli)
+    implementation(libs.slf4j.api)
 
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir = 
file("external-processors/iggy-connector-pinot")
+    runtimeOnly(libs.logback.classic)
+}
diff --git a/foreign/java/settings.gradle.kts 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/IggyBench.java
similarity index 58%
copy from foreign/java/settings.gradle.kts
copy to foreign/java/bench/src/main/java/org/apache/iggy/bench/IggyBench.java
index 477beb85d..6ea2fe2a9 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/bench/src/main/java/org/apache/iggy/bench/IggyBench.java
@@ -17,17 +17,17 @@
  * under the License.
  */
 
-rootProject.name = "iggy-java-sdk"
+package org.apache.iggy.bench;
 
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+import org.apache.iggy.bench.cli.IggyBenchCommand;
+import picocli.CommandLine;
 
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir = 
file("external-processors/iggy-connector-flink/iggy-connector-library")
+public final class IggyBench {
 
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir = 
file("external-processors/iggy-connector-flink/iggy-flink-examples")
+    private IggyBench() {}
 
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir = 
file("external-processors/iggy-connector-pinot")
+    public static void main(String[] args) {
+        var exitCode = new CommandLine(new IggyBenchCommand()).execute(args);
+        System.exit(exitCode);
+    }
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
new file mode 100644
index 000000000..a36674438
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/benchmarks/tcp/async/TcpAsyncPinnedProducer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.benchmarks.tcp.async;
+
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.apache.iggy.bench.models.provision.ProvisionedResources;
+import org.apache.iggy.bench.provision.ResourceProvisioner;
+
+public final class TcpAsyncPinnedProducer {
+
+    private final GlobalCliArgs globalCliArgs;
+    private final PinnedProducerCliArgs pinnedProducerCliArgs;
+    private final ResourceProvisioner resourceProvisioner;
+
+    TcpAsyncPinnedProducer(
+            GlobalCliArgs globalCliArgs,
+            PinnedProducerCliArgs pinnedProducerCliArgs,
+            ResourceProvisioner resourceProvisioner) {
+        this.globalCliArgs = globalCliArgs;
+        this.pinnedProducerCliArgs = pinnedProducerCliArgs;
+        this.resourceProvisioner = resourceProvisioner;
+    }
+
+    public TcpAsyncPinnedProducer(GlobalCliArgs globalCliArgs, 
PinnedProducerCliArgs pinnedProducerCliArgs) {
+        this(globalCliArgs, pinnedProducerCliArgs, new ResourceProvisioner());
+    }
+
+    public ProvisionedResources provisionResources() {
+        return resourceProvisioner.provisionResources(globalCliArgs, 
pinnedProducerCliArgs);
+    }
+
+    public void run() {}
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/IggyBenchCommand.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/IggyBenchCommand.java
new file mode 100644
index 000000000..99a937d53
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/IggyBenchCommand.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.cli;
+
+import picocli.CommandLine.ArgGroup;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ExitCode;
+import picocli.CommandLine.Model.CommandSpec;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Spec;
+
+import java.util.concurrent.Callable;
+
+@Command(
+        name = "iggy-bench",
+        mixinStandardHelpOptions = true,
+        subcommands = {PinnedProducerCommand.class},
+        description = "Benchmark CLI for the Apache Iggy Java SDK.")
+public final class IggyBenchCommand implements Callable<Integer> {
+    @Option(
+            names = {"--message-size", "-m"},
+            defaultValue = "1000",
+            description = "Message size in bytes.")
+    private int messageSize;
+
+    @Option(
+            names = {"--messages-per-batch", "-P"},
+            defaultValue = "1000",
+            description = "Messages per batch.")
+    private int messagesPerBatch;
+
+    @ArgGroup(exclusive = true, multiplicity = "0..1")
+    private DataLimitOptions dataLimitOptions;
+
+    @Option(
+            names = {"--rate-limit", "-r"},
+            defaultValue = "0",
+            description = "(NOT USED CURRENTLY) Optional total rate limit in 
bytes per second.")
+    private long rateLimit;
+
+    @Option(
+            names = {"--warmup-time", "-w"},
+            defaultValue = "20000",
+            description = "Warmup time in milliseconds.")
+    private long warmupTimeMs;
+
+    @Option(
+            names = {"--sampling-time", "-t"},
+            defaultValue = "10",
+            description = "Sampling time in milliseconds.")
+    private long samplingTimeMs;
+
+    @Option(
+            names = {"--moving-average-window", "-W"},
+            defaultValue = "20",
+            description = "Moving average window size.")
+    private int movingAverageWindow;
+
+    @Option(
+            names = {"--username", "-u"},
+            defaultValue = "iggy",
+            description = "Server username.")
+    private String username;
+
+    @Option(
+            names = {"--password", "-p"},
+            defaultValue = "iggy",
+            description = "Server password.")
+    private String password;
+
+    @Option(names = "--reuse-streams", defaultValue = "false", description = 
"Reuse existing benchmark streams.")
+    private boolean reuseStreams;
+
+    @Spec
+    private CommandSpec spec;
+
+    @Override
+    public Integer call() {
+        spec.commandLine().usage(spec.commandLine().getOut());
+        return ExitCode.USAGE;
+    }
+
+    public int messageSize() {
+        return messageSize;
+    }
+
+    public int messagesPerBatch() {
+        return messagesPerBatch;
+    }
+
+    public int messageBatches() {
+        if (dataLimitOptions == null || dataLimitOptions.messageBatches == 
null) {
+            return 1000;
+        }
+        return dataLimitOptions.messageBatches;
+    }
+
+    public long totalData() {
+        if (dataLimitOptions == null || dataLimitOptions.totalData == null) {
+            return 0L;
+        }
+        return dataLimitOptions.totalData;
+    }
+
+    public long rateLimit() {
+        return rateLimit;
+    }
+
+    public long warmupTimeMs() {
+        return warmupTimeMs;
+    }
+
+    public long samplingTimeMs() {
+        return samplingTimeMs;
+    }
+
+    public int movingAverageWindow() {
+        return movingAverageWindow;
+    }
+
+    public String username() {
+        return username;
+    }
+
+    public String password() {
+        return password;
+    }
+
+    public boolean reuseStreams() {
+        return reuseStreams;
+    }
+
+    private static final class DataLimitOptions {
+
+        @Option(
+                names = {"--message-batches", "-b"},
+                description = "Number of message batches. Defaults to 1000 
when --total-data is not specified.")
+        private Integer messageBatches;
+
+        @Option(
+                names = {"--total-data", "-T"},
+                description = "Total data volume in bytes.")
+        private Long totalData;
+    }
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
new file mode 100644
index 000000000..9d58a6fcf
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/cli/PinnedProducerCommand.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.cli;
+
+import org.apache.iggy.bench.benchmarks.tcp.async.TcpAsyncPinnedProducer;
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ExitCode;
+import picocli.CommandLine.Model.CommandSpec;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.ParentCommand;
+import picocli.CommandLine.Spec;
+
+import java.util.concurrent.Callable;
+
+@Command(
+        name = "pinned-producer",
+        aliases = {"pp"},
+        mixinStandardHelpOptions = true,
+        description = "Pinned producer benchmark.")
+public final class PinnedProducerCommand implements Callable<Integer> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PinnedProducerCommand.class);
+    private static final long DEFAULT_MAX_TOPIC_SIZE = 0L;
+    private static final long DEFAULT_MESSAGE_EXPIRY = 0L;
+
+    @ParentCommand
+    private IggyBenchCommand rootCommand;
+
+    @Spec
+    private CommandSpec spec;
+
+    @Option(
+            names = {"--streams", "-s"},
+            description = "Number of streams. Defaults to the number of 
producers.")
+    private Integer streams;
+
+    @Option(
+            names = {"--producers", "-p"},
+            defaultValue = "8",
+            description = "Number of producers.")
+    private int producers = 8;
+
+    @Option(
+            names = {"--max-topic-size", "-T"},
+            defaultValue = "0",
+            description = "Max topic size in bytes. Use 0 for the server 
default.")
+    private long maxTopicSize = DEFAULT_MAX_TOPIC_SIZE;
+
+    @Option(
+            names = {"--message-expiry", "-e"},
+            defaultValue = "0",
+            description = "Topic message expiry in microseconds. Use 0 to 
never expire.")
+    private long messageExpiry = DEFAULT_MESSAGE_EXPIRY;
+
+    @Override
+    public Integer call() {
+        try {
+            var messageBatches = rootCommand.totalData() > 0 ? 0 : 
rootCommand.messageBatches();
+            var globalCliArgs = new GlobalCliArgs(
+                    rootCommand.messageSize(),
+                    rootCommand.messagesPerBatch(),
+                    messageBatches,
+                    rootCommand.totalData(),
+                    rootCommand.rateLimit(),
+                    rootCommand.warmupTimeMs(),
+                    rootCommand.samplingTimeMs(),
+                    rootCommand.movingAverageWindow(),
+                    rootCommand.username(),
+                    rootCommand.password(),
+                    rootCommand.reuseStreams());
+
+            var pinnedProducerCliArgs = new PinnedProducerCliArgs(
+                    streams != null ? streams : producers, producers, 
maxTopicSize, messageExpiry);
+
+            globalCliArgs.validate();
+            pinnedProducerCliArgs.validate();
+
+            log.info("Starting the Pinned Producer benchmark...");
+            var benchmark = new TcpAsyncPinnedProducer(globalCliArgs, 
pinnedProducerCliArgs);
+            benchmark.provisionResources();
+            benchmark.run();
+
+            return ExitCode.OK;
+        } catch (RuntimeException exception) {
+            var message = exception.getMessage() != null ? 
exception.getMessage() : exception.toString();
+            spec.commandLine().getErr().println(message);
+            return ExitCode.SOFTWARE;
+        }
+    }
+}
diff --git a/foreign/java/settings.gradle.kts 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
similarity index 58%
copy from foreign/java/settings.gradle.kts
copy to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
index 477beb85d..2a9b2def4 100644
--- a/foreign/java/settings.gradle.kts
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/exception/BenchmarkException.java
@@ -17,17 +17,15 @@
  * under the License.
  */
 
-rootProject.name = "iggy-java-sdk"
+package org.apache.iggy.bench.exception;
 
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+public class BenchmarkException extends RuntimeException {
 
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir = 
file("external-processors/iggy-connector-flink/iggy-connector-library")
+    public BenchmarkException(String message) {
+        super(message);
+    }
 
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir = 
file("external-processors/iggy-connector-flink/iggy-flink-examples")
-
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir = 
file("external-processors/iggy-connector-pinot")
+    public BenchmarkException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
new file mode 100644
index 000000000..51e92b41a
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/GlobalCliArgs.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.models.cli;
+
+import org.apache.iggy.bench.exception.BenchmarkException;
+
+public record GlobalCliArgs(
+        int messageSize,
+        int messagesPerBatch,
+        int messageBatches,
+        long totalData,
+        long rateLimit,
+        long warmupTimeMs,
+        long samplingTimeMs,
+        int movingAverageWindow,
+        String username,
+        String password,
+        boolean reuseStreams) {
+
+    public void validate() {
+        validateCredentials();
+        validateMessageSettings();
+        validateTimingSettings();
+        validateDataSettings();
+    }
+
+    private void validateCredentials() {
+        if (username == null || username.isBlank()) {
+            throw new BenchmarkException("--username cannot be blank.");
+        }
+        if (password == null || password.isBlank()) {
+            throw new BenchmarkException("--password cannot be blank.");
+        }
+    }
+
+    private void validateMessageSettings() {
+        if (messageSize <= 0) {
+            throw new BenchmarkException("--message-size must be greater than 
0.");
+        }
+        if (messagesPerBatch <= 0) {
+            throw new BenchmarkException("--messages-per-batch must be greater 
than 0.");
+        }
+        if (rateLimit < 0) {
+            throw new BenchmarkException("--rate-limit must be greater than or 
equal to 0.");
+        }
+    }
+
+    private void validateTimingSettings() {
+        if (warmupTimeMs < 0) {
+            throw new BenchmarkException("--warmup-time must be greater than 
or equal to 0.");
+        }
+        if (samplingTimeMs <= 0) {
+            throw new BenchmarkException("--sampling-time must be greater than 
0.");
+        }
+        if (movingAverageWindow <= 0) {
+            throw new BenchmarkException("--moving-average-window must be 
greater than 0.");
+        }
+    }
+
+    private void validateDataSettings() {
+        if (totalData < 0) {
+            throw new BenchmarkException("--total-data must be greater than or 
equal to 0.");
+        }
+        if (totalData == 0 && messageBatches <= 0) {
+            throw new BenchmarkException("--message-batches must be greater 
than 0.");
+        }
+    }
+}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
new file mode 100644
index 000000000..dd355a929
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/cli/PinnedProducerCliArgs.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.models.cli;
+
+import org.apache.iggy.bench.exception.BenchmarkException;
+
+public record PinnedProducerCliArgs(int streams, int producers, long 
maxTopicSize, long messageExpiry) {
+
+    public void validate() {
+        if (producers <= 0) {
+            throw new BenchmarkException("--producers must be greater than 
0.");
+        }
+        if (streams <= 0) {
+            throw new BenchmarkException("--streams must be greater than 0.");
+        }
+        if (streams != producers) {
+            throw new BenchmarkException("For pinned producer, --streams must 
match --producers.");
+        }
+        if (maxTopicSize < 0) {
+            throw new BenchmarkException("--max-topic-size must be greater 
than or equal to 0.");
+        }
+        if (messageExpiry < 0L) {
+            throw new BenchmarkException("--message-expiry must be greater 
than or equal to 0.");
+        }
+    }
+}
diff --git a/foreign/java/settings.gradle.kts 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
similarity index 58%
copy from foreign/java/settings.gradle.kts
copy to 
foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
index 477beb85d..e5411e24e 100644
--- a/foreign/java/settings.gradle.kts
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/models/provision/ProvisionedResources.java
@@ -17,17 +17,8 @@
  * under the License.
  */
 
-rootProject.name = "iggy-java-sdk"
+package org.apache.iggy.bench.models.provision;
 
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
+import java.util.List;
 
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir = 
file("external-processors/iggy-connector-flink/iggy-connector-library")
-
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir = 
file("external-processors/iggy-connector-flink/iggy-flink-examples")
-
-include("iggy-connector-pinot")
-project(":iggy-connector-pinot").projectDir = 
file("external-processors/iggy-connector-pinot")
+public record ProvisionedResources(List<String> streamNames, List<String> 
topicNames) {}
diff --git 
a/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
new file mode 100644
index 000000000..dc5c71625
--- /dev/null
+++ 
b/foreign/java/bench/src/main/java/org/apache/iggy/bench/provision/ResourceProvisioner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.bench.provision;
+
+import org.apache.iggy.bench.exception.BenchmarkException;
+import org.apache.iggy.bench.models.cli.GlobalCliArgs;
+import org.apache.iggy.bench.models.cli.PinnedProducerCliArgs;
+import org.apache.iggy.bench.models.provision.ProvisionedResources;
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public final class ResourceProvisioner {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ResourceProvisioner.class);
+
+    public ProvisionedResources provisionResources(
+            GlobalCliArgs globalCliArgs, PinnedProducerCliArgs 
pinnedProducerCliArgs) {
+        var streamNames = new ArrayList<String>();
+        var topicNames = List.of("topic-1");
+
+        try (var client = IggyTcpClient.builder()
+                .credentials(globalCliArgs.username(), 
globalCliArgs.password())
+                .buildAndLogin()) {
+            var existingStreamNames = client.streams().getStreams().stream()
+                    .map(stream -> stream.name())
+                    .toList();
+
+            for (var i = 1; i <= pinnedProducerCliArgs.producers(); i++) {
+                var streamName = "bench-stream-" + i;
+                streamNames.add(streamName);
+
+                if (existingStreamNames.contains(streamName) && 
!globalCliArgs.reuseStreams()) {
+                    log.info("Deleting pre-existing stream '{}'", streamName);
+                    client.streams().deleteStream(StreamId.of(streamName));
+                }
+
+                if (existingStreamNames.contains(streamName) && 
globalCliArgs.reuseStreams()) {
+                    log.info("Appending to existing stream '{}'", streamName);
+                } else {
+                    log.info("Creating the test stream '{}'", streamName);
+
+                    client.streams().createStream(streamName);
+
+                    var maxTopicSize = pinnedProducerCliArgs.maxTopicSize() == 
0L
+                            ? "server default"
+                            : 
Long.toString(pinnedProducerCliArgs.maxTopicSize());
+                    var messageExpiry = pinnedProducerCliArgs.messageExpiry() 
== 0L
+                            ? "never"
+                            : 
Long.toString(pinnedProducerCliArgs.messageExpiry());
+
+                    log.info(
+                            "Creating the test topic '{}' for stream '{}' with 
max topic size: {}, message expiry: {}",
+                            topicNames.get(0),
+                            streamName,
+                            maxTopicSize,
+                            messageExpiry);
+
+                    client.topics()
+                            .createTopic(
+                                    StreamId.of(streamName),
+                                    1L,
+                                    CompressionAlgorithm.None,
+                                    
BigInteger.valueOf(pinnedProducerCliArgs.messageExpiry()),
+                                    
BigInteger.valueOf(pinnedProducerCliArgs.maxTopicSize()),
+                                    Optional.empty(),
+                                    topicNames.get(0));
+                }
+            }
+        } catch (RuntimeException exception) {
+            throw new BenchmarkException("Failed to provision benchmark 
resources.", exception);
+        }
+
+        return new ProvisionedResources(streamNames, topicNames);
+    }
+}
diff --git a/foreign/java/gradle/libs.versions.toml 
b/foreign/java/gradle/libs.versions.toml
index 9222be7a9..24f4acba6 100644
--- a/foreign/java/gradle/libs.versions.toml
+++ b/foreign/java/gradle/libs.versions.toml
@@ -49,6 +49,7 @@ spotbugs = "4.9.8"
 
 # Config
 typesafe-config = "1.4.5"
+picocli = "4.7.7"
 
 # Build plugins
 spotless = "8.1.0"
@@ -102,6 +103,7 @@ spotbugs-annotations = { module = 
"com.github.spotbugs:spotbugs-annotations", ve
 
 # Config
 typesafe-config = { module = "com.typesafe:config", version.ref = 
"typesafe-config" }
+picocli = { module = "info.picocli:picocli", version.ref = "picocli" }
 
 [bundles]
 testing = ["junit-jupiter", "junit-platform-launcher", "assertj-core"]
diff --git a/foreign/java/settings.gradle.kts b/foreign/java/settings.gradle.kts
index 477beb85d..3f256e805 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/settings.gradle.kts
@@ -22,6 +22,9 @@ rootProject.name = "iggy-java-sdk"
 include("iggy")
 project(":iggy").projectDir = file("java-sdk")
 
+include("iggy-bench")
+project(":iggy-bench").projectDir = file("bench")
+
 // External processors - Stream processing integrations
 include("iggy-connector-library")
 project(":iggy-connector-library").projectDir = 
file("external-processors/iggy-connector-flink/iggy-connector-library")

Reply via email to