This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ee1f6dfa feat(java): Support connection timeout (#3026)
9ee1f6dfa is described below

commit 9ee1f6dfaafdfc367032579469b2d31860eec5eb
Author: Jonathon Henderson <[email protected]>
AuthorDate: Fri Mar 27 09:30:34 2026 +0000

    feat(java): Support connection timeout (#3026)
    
    Closes #3010
---
 .../iggy/client/async/tcp/AsyncIggyTcpClient.java  |  7 +-
 .../async/tcp/AsyncIggyTcpClientBuilder.java       | 54 ++++++++++++----
 .../iggy/client/async/tcp/AsyncTcpConnection.java  | 17 ++++-
 .../async/tcp/AsyncIggyTcpClientBuilderTest.java   | 74 ++++++++++++++++++++++
 4 files changed, 136 insertions(+), 16 deletions(-)

diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index 81633ff49..9a9738db5 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -100,6 +100,7 @@ public class AsyncIggyTcpClient {
     private final int port;
     private final Optional<String> username;
     private final Optional<String> password;
+    private final Optional<Duration> connectionTimeout;
     private final Optional<Duration> acquireTimeout;
     private final Optional<Duration> requestTimeout;
     private final Optional<Integer> connectionPoolSize;
@@ -126,7 +127,7 @@ public class AsyncIggyTcpClient {
      * @param port the server port
      */
     public AsyncIggyTcpClient(String host, int port) {
-        this(host, port, null, null, null, null, null, null, false, 
Optional.empty());
+        this(host, port, null, null, null, null, null, null, null, false, 
Optional.empty());
     }
 
     @SuppressWarnings("checkstyle:ParameterNumber")
@@ -135,6 +136,7 @@ public class AsyncIggyTcpClient {
             int port,
             String username,
             String password,
+            Duration connectionTimeout,
             Duration acquireTimeout,
             Duration requestTimeout,
             Integer connectionPoolSize,
@@ -145,6 +147,7 @@ public class AsyncIggyTcpClient {
         this.port = port;
         this.username = Optional.ofNullable(username);
         this.password = Optional.ofNullable(password);
+        this.connectionTimeout = Optional.ofNullable(connectionTimeout);
         this.acquireTimeout = Optional.ofNullable(acquireTimeout);
         this.requestTimeout = Optional.ofNullable(requestTimeout);
         this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
@@ -176,7 +179,7 @@ public class AsyncIggyTcpClient {
         connectionPoolSize.ifPresent(poolConfigBuilder::setMaxConnections);
         acquireTimeout.ifPresent(timeout -> 
poolConfigBuilder.setAcquireTimeoutMillis(timeout.toMillis()));
         TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build();
-        connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate, poolConfig);
+        connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate, poolConfig, connectionTimeout);
         return connection.connect().thenRun(() -> {
             log.debug("Connected to {}:{} | {}", host, port, 
IggyVersion.getInstance());
             messagesClient = new MessagesTcpClient(connection);
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
index 38a636d09..85664e5cf 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
@@ -220,29 +220,61 @@ public final class AsyncIggyTcpClientBuilder {
      * @throws IggyInvalidArgumentException if the host is null or empty, or 
if the port is not positive
      */
     public AsyncIggyTcpClient build() {
+        validateHost();
+        validatePort();
+        validateConnectionPoolSize();
+        validateConnectionTimeout();
+        validateAcquireTimeout();
+
+        return new AsyncIggyTcpClient(
+                host,
+                port,
+                username,
+                password,
+                connectionTimeout,
+                acquireTimeout,
+                requestTimeout,
+                connectionPoolSize,
+                retryPolicy,
+                enableTls,
+                Optional.ofNullable(tlsCertificate));
+    }
+
+    private void validateHost() {
         if (host == null || host.isEmpty()) {
             throw new IggyInvalidArgumentException("Host cannot be null or 
empty");
         }
+    }
+
+    private void validatePort() {
         if (port == null || port <= 0) {
             throw new IggyInvalidArgumentException("Port must be a positive 
integer");
         }
+    }
+
+    private void validateConnectionPoolSize() {
         if (connectionPoolSize != null && connectionPoolSize <= 0) {
             throw new IggyInvalidArgumentException("Connection pool size 
cannot by 0 or negative");
         }
+    }
+
+    private void validateConnectionTimeout() {
+        if (connectionTimeout == null) {
+            return;
+        }
+        if (connectionTimeout.equals(Duration.ZERO) || 
connectionTimeout.isNegative()) {
+            throw new IggyInvalidArgumentException("ConnectionTimeout Cannot 
be 0 or Negative");
+        }
+        if (connectionTimeout.toMillis() > ((long) Integer.MAX_VALUE)) {
+            throw new IggyInvalidArgumentException(
+                    String.format("ConnectionTimeout Cannot be greater than 
%d", Integer.MAX_VALUE));
+        }
+    }
+
+    private void validateAcquireTimeout() {
         if (acquireTimeout != null && (acquireTimeout.equals(Duration.ZERO) || 
acquireTimeout.isNegative())) {
             throw new IggyInvalidArgumentException("AcquireTimeout Cannot be 0 
or Negative");
         }
-        return new AsyncIggyTcpClient(
-                host,
-                port,
-                username,
-                password,
-                connectionTimeout,
-                requestTimeout,
-                connectionPoolSize,
-                retryPolicy,
-                enableTls,
-                Optional.ofNullable(tlsCertificate));
     }
 
     /**
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index bed6379f6..c74c615d4 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ConnectTimeoutException;
 import io.netty.channel.IoEventLoopGroup;
 import io.netty.channel.MultiThreadIoEventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -38,6 +39,7 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.util.concurrent.FutureListener;
 import org.apache.iggy.exception.IggyClientException;
+import org.apache.iggy.exception.IggyConnectionException;
 import org.apache.iggy.exception.IggyEmptyResponseException;
 import org.apache.iggy.exception.IggyInvalidArgumentException;
 import org.apache.iggy.exception.IggyNotConnectedException;
@@ -49,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLException;
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -65,6 +68,7 @@ import java.util.function.Function;
  */
 public class AsyncTcpConnection {
     private static final Logger log = 
LoggerFactory.getLogger(AsyncTcpConnection.class);
+    private static final Duration DEFAULT_CONNECTION_TIMEOUT = 
Duration.ofMillis(3000);
 
     private final IoEventLoopGroup eventLoopGroup;
     private final FixedChannelPool channelPool;
@@ -80,7 +84,8 @@ public class AsyncTcpConnection {
             int port,
             boolean enableTls,
             Optional<File> tlsCertificate,
-            TCPConnectionPoolConfig poolConfig) {
+            TCPConnectionPoolConfig poolConfig,
+            Optional<Duration> connectionTimeout) {
         this.eventLoopGroup = new 
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
 
         SslContext sslContext = null;
@@ -98,7 +103,8 @@ public class AsyncTcpConnection {
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
-                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)
+                        
connectionTimeout.orElse(DEFAULT_CONNECTION_TIMEOUT).toMillis())
                 .option(ChannelOption.SO_KEEPALIVE, true)
                 .remoteAddress(host, port);
 
@@ -124,7 +130,12 @@ public class AsyncTcpConnection {
                 channelPool.release(f.getNow());
                 future.complete(null);
             } else {
-                future.completeExceptionally(f.cause());
+                Throwable cause = f.cause();
+                if (cause instanceof ConnectTimeoutException) {
+                    future.completeExceptionally(new 
IggyConnectionException("Connection timeout", cause));
+                } else {
+                    future.completeExceptionally(cause);
+                }
             }
         });
         return future;
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
index 3052630d4..6f33b106a 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
@@ -137,6 +137,80 @@ class AsyncIggyTcpClientBuilderTest extends 
BaseIntegrationTest {
         
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
     }
 
+    @Test
+    void shouldThrowExceptionForNullPort() {
+        // Given: Builder with null port
+        AsyncIggyTcpClientBuilder builder =
+                AsyncIggyTcpClient.builder().host(serverHost()).port(null);
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowExceptionForZeroConnectionPoolSize() {
+        // Given: Builder with 0 connection pool size
+        AsyncIggyTcpClientBuilder builder = 
AsyncIggyTcpClient.builder().connectionPoolSize(0);
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowExceptionForNegativeConnectionPoolSize() {
+        // Given: Builder with negative connection pool size
+        AsyncIggyTcpClientBuilder builder = 
AsyncIggyTcpClient.builder().connectionPoolSize(-1);
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowExceptionForZeroConnectionTimeout() {
+        // Given: Builder with 0 connection timeout
+        AsyncIggyTcpClientBuilder builder = 
AsyncIggyTcpClient.builder().connectionTimeout(Duration.ofMillis(0));
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowExceptionForNegativeConnectionTimeout() {
+        // Given: Builder with negative connection timeout
+        AsyncIggyTcpClientBuilder builder = 
AsyncIggyTcpClient.builder().connectionTimeout(Duration.ofMillis(-1));
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
+    @Test
+    void 
shouldThrowExceptionForConnectionTimeoutInMillisecondsGreaterThanMaximumInteger()
 {
+        // Given: Builder with connection timeout in milliseconds greater than 
maximum integer
+        AsyncIggyTcpClientBuilder builder =
+                
AsyncIggyTcpClient.builder().connectionTimeout(Duration.ofMillis(((long) 
Integer.MAX_VALUE) + 1));
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowExceptionForZeroAcquireTimeout() {
+        // Given: Builder with 0 acquire timeout
+        AsyncIggyTcpClientBuilder builder = 
AsyncIggyTcpClient.builder().acquireTimeout(Duration.ofMillis(0));
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowExceptionForNegativeAcquireTimeout() {
+        // Given: Builder with negative acquire timeout
+        AsyncIggyTcpClientBuilder builder = 
AsyncIggyTcpClient.builder().acquireTimeout(Duration.ofMillis(-1000));
+
+        // When/Then: Building should throw IggyInvalidArgumentException
+        
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+    }
+
     @Test
     void shouldMaintainBackwardCompatibilityWithOldConstructor() throws 
Exception {
         // Given: Old constructor approach

Reply via email to