This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 9ee1f6dfa feat(java): Support connection timeout (#3026)
9ee1f6dfa is described below
commit 9ee1f6dfaafdfc367032579469b2d31860eec5eb
Author: Jonathon Henderson <[email protected]>
AuthorDate: Fri Mar 27 09:30:34 2026 +0000
feat(java): Support connection timeout (#3026)
Closes #3010
---
.../iggy/client/async/tcp/AsyncIggyTcpClient.java | 7 +-
.../async/tcp/AsyncIggyTcpClientBuilder.java | 54 ++++++++++++----
.../iggy/client/async/tcp/AsyncTcpConnection.java | 17 ++++-
.../async/tcp/AsyncIggyTcpClientBuilderTest.java | 74 ++++++++++++++++++++++
4 files changed, 136 insertions(+), 16 deletions(-)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index 81633ff49..9a9738db5 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -100,6 +100,7 @@ public class AsyncIggyTcpClient {
private final int port;
private final Optional<String> username;
private final Optional<String> password;
+ private final Optional<Duration> connectionTimeout;
private final Optional<Duration> acquireTimeout;
private final Optional<Duration> requestTimeout;
private final Optional<Integer> connectionPoolSize;
@@ -126,7 +127,7 @@ public class AsyncIggyTcpClient {
* @param port the server port
*/
public AsyncIggyTcpClient(String host, int port) {
- this(host, port, null, null, null, null, null, null, false,
Optional.empty());
+ this(host, port, null, null, null, null, null, null, null, false,
Optional.empty());
}
@SuppressWarnings("checkstyle:ParameterNumber")
@@ -135,6 +136,7 @@ public class AsyncIggyTcpClient {
int port,
String username,
String password,
+ Duration connectionTimeout,
Duration acquireTimeout,
Duration requestTimeout,
Integer connectionPoolSize,
@@ -145,6 +147,7 @@ public class AsyncIggyTcpClient {
this.port = port;
this.username = Optional.ofNullable(username);
this.password = Optional.ofNullable(password);
+ this.connectionTimeout = Optional.ofNullable(connectionTimeout);
this.acquireTimeout = Optional.ofNullable(acquireTimeout);
this.requestTimeout = Optional.ofNullable(requestTimeout);
this.connectionPoolSize = Optional.ofNullable(connectionPoolSize);
@@ -176,7 +179,7 @@ public class AsyncIggyTcpClient {
connectionPoolSize.ifPresent(poolConfigBuilder::setMaxConnections);
acquireTimeout.ifPresent(timeout ->
poolConfigBuilder.setAcquireTimeoutMillis(timeout.toMillis()));
TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build();
- connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate, poolConfig);
+ connection = new AsyncTcpConnection(host, port, enableTls,
tlsCertificate, poolConfig, connectionTimeout);
return connection.connect().thenRun(() -> {
log.debug("Connected to {}:{} | {}", host, port,
IggyVersion.getInstance());
messagesClient = new MessagesTcpClient(connection);
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
index 38a636d09..85664e5cf 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java
@@ -220,29 +220,61 @@ public final class AsyncIggyTcpClientBuilder {
* @throws IggyInvalidArgumentException if the host is null or empty, or
if the port is not positive
*/
public AsyncIggyTcpClient build() {
+ validateHost();
+ validatePort();
+ validateConnectionPoolSize();
+ validateConnectionTimeout();
+ validateAcquireTimeout();
+
+ return new AsyncIggyTcpClient(
+ host,
+ port,
+ username,
+ password,
+ connectionTimeout,
+ acquireTimeout,
+ requestTimeout,
+ connectionPoolSize,
+ retryPolicy,
+ enableTls,
+ Optional.ofNullable(tlsCertificate));
+ }
+
+ private void validateHost() {
if (host == null || host.isEmpty()) {
throw new IggyInvalidArgumentException("Host cannot be null or
empty");
}
+ }
+
+ private void validatePort() {
if (port == null || port <= 0) {
throw new IggyInvalidArgumentException("Port must be a positive
integer");
}
+ }
+
+ private void validateConnectionPoolSize() {
if (connectionPoolSize != null && connectionPoolSize <= 0) {
throw new IggyInvalidArgumentException("Connection pool size
cannot by 0 or negative");
}
+ }
+
+ private void validateConnectionTimeout() {
+ if (connectionTimeout == null) {
+ return;
+ }
+ if (connectionTimeout.equals(Duration.ZERO) ||
connectionTimeout.isNegative()) {
+ throw new IggyInvalidArgumentException("ConnectionTimeout Cannot
be 0 or Negative");
+ }
+ if (connectionTimeout.toMillis() > ((long) Integer.MAX_VALUE)) {
+ throw new IggyInvalidArgumentException(
+ String.format("ConnectionTimeout Cannot be greater than
%d", Integer.MAX_VALUE));
+ }
+ }
+
+ private void validateAcquireTimeout() {
if (acquireTimeout != null && (acquireTimeout.equals(Duration.ZERO) ||
acquireTimeout.isNegative())) {
throw new IggyInvalidArgumentException("AcquireTimeout Cannot be 0
or Negative");
}
- return new AsyncIggyTcpClient(
- host,
- port,
- username,
- password,
- connectionTimeout,
- requestTimeout,
- connectionPoolSize,
- retryPolicy,
- enableTls,
- Optional.ofNullable(tlsCertificate));
}
/**
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index bed6379f6..c74c615d4 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.IoEventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -38,6 +39,7 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.FutureListener;
import org.apache.iggy.exception.IggyClientException;
+import org.apache.iggy.exception.IggyConnectionException;
import org.apache.iggy.exception.IggyEmptyResponseException;
import org.apache.iggy.exception.IggyInvalidArgumentException;
import org.apache.iggy.exception.IggyNotConnectedException;
@@ -49,6 +51,7 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.io.File;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -65,6 +68,7 @@ import java.util.function.Function;
*/
public class AsyncTcpConnection {
private static final Logger log =
LoggerFactory.getLogger(AsyncTcpConnection.class);
+ private static final Duration DEFAULT_CONNECTION_TIMEOUT =
Duration.ofMillis(3000);
private final IoEventLoopGroup eventLoopGroup;
private final FixedChannelPool channelPool;
@@ -80,7 +84,8 @@ public class AsyncTcpConnection {
int port,
boolean enableTls,
Optional<File> tlsCertificate,
- TCPConnectionPoolConfig poolConfig) {
+ TCPConnectionPoolConfig poolConfig,
+ Optional<Duration> connectionTimeout) {
this.eventLoopGroup = new
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
SslContext sslContext = null;
@@ -98,7 +103,8 @@ public class AsyncTcpConnection {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)
+
connectionTimeout.orElse(DEFAULT_CONNECTION_TIMEOUT).toMillis())
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(host, port);
@@ -124,7 +130,12 @@ public class AsyncTcpConnection {
channelPool.release(f.getNow());
future.complete(null);
} else {
- future.completeExceptionally(f.cause());
+ Throwable cause = f.cause();
+ if (cause instanceof ConnectTimeoutException) {
+ future.completeExceptionally(new
IggyConnectionException("Connection timeout", cause));
+ } else {
+ future.completeExceptionally(cause);
+ }
}
});
return future;
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
index 3052630d4..6f33b106a 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilderTest.java
@@ -137,6 +137,80 @@ class AsyncIggyTcpClientBuilderTest extends
BaseIntegrationTest {
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
}
+ @Test
+ void shouldThrowExceptionForNullPort() {
+ // Given: Builder with null port
+ AsyncIggyTcpClientBuilder builder =
+ AsyncIggyTcpClient.builder().host(serverHost()).port(null);
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowExceptionForZeroConnectionPoolSize() {
+ // Given: Builder with 0 connection pool size
+ AsyncIggyTcpClientBuilder builder =
AsyncIggyTcpClient.builder().connectionPoolSize(0);
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowExceptionForNegativeConnectionPoolSize() {
+ // Given: Builder with negative connection pool size
+ AsyncIggyTcpClientBuilder builder =
AsyncIggyTcpClient.builder().connectionPoolSize(-1);
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowExceptionForZeroConnectionTimeout() {
+ // Given: Builder with 0 connection timeout
+ AsyncIggyTcpClientBuilder builder =
AsyncIggyTcpClient.builder().connectionTimeout(Duration.ofMillis(0));
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowExceptionForNegativeConnectionTimeout() {
+ // Given: Builder with negative connection timeout
+ AsyncIggyTcpClientBuilder builder =
AsyncIggyTcpClient.builder().connectionTimeout(Duration.ofMillis(-1));
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
+ @Test
+ void
shouldThrowExceptionForConnectionTimeoutInMillisecondsGreaterThanMaximumInteger()
{
+ // Given: Builder with connection timeout in milliseconds greater than
maximum integer
+ AsyncIggyTcpClientBuilder builder =
+
AsyncIggyTcpClient.builder().connectionTimeout(Duration.ofMillis(((long)
Integer.MAX_VALUE) + 1));
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowExceptionForZeroAcquireTimeout() {
+ // Given: Builder with 0 acquire timeout
+ AsyncIggyTcpClientBuilder builder =
AsyncIggyTcpClient.builder().acquireTimeout(Duration.ofMillis(0));
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowExceptionForNegativeAcquireTimeout() {
+ // Given: Builder with negative acquire timeout
+ AsyncIggyTcpClientBuilder builder =
AsyncIggyTcpClient.builder().acquireTimeout(Duration.ofMillis(-1000));
+
+ // When/Then: Building should throw IggyInvalidArgumentException
+
assertThatThrownBy(builder::build).isInstanceOf(IggyInvalidArgumentException.class);
+ }
+
@Test
void shouldMaintainBackwardCompatibilityWithOldConstructor() throws
Exception {
// Given: Old constructor approach