`import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.List; import java.util.UUID; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class ImprovedGrpcClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(ImprovedGrpcClient.class); // Configuration constants private static final int MAX_CONCURRENT_REQUESTS = 10; private static final int STREAM_TIMEOUT_SECONDS = 30; private static final int RECONNECT_DELAY_SECONDS = 5; private static final int MAX_RECONNECT_ATTEMPTS = 3; // gRPC and stream management private final ManagedChannel channel; private final YourServiceGrpc.YourServiceStub asyncStub; private volatile StreamObserver<YourRequestType> requestObserver; // Concurrency controls private final Semaphore requestThrottle; private final AtomicBoolean isStreamInitialized; private final Object streamLock; // Request tracking private final Map<String, CompletableFuture<YourResponseType>> responseFutures; private final ConcurrentLinkedQueue<YourRequestType> requestQueue; // Executors private final ScheduledExecutorService streamMonitor; private final ExecutorService responseProcessor; public ImprovedGrpcClient(String host, int port) { // Channel configuration with robust settings this.channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext() // Use .useTransportSecurity() in production .keepAliveTime(30, TimeUnit.SECONDS) .keepAliveTimeout(10, TimeUnit.SECONDS) .maxInboundMessageSize(100 * 1024 * 1024) .enableRetry() .build(); this.asyncStub = YourServiceGrpc.newStub(channel); // Initialize concurrent controls this.requestThrottle = new Semaphore(MAX_CONCURRENT_REQUESTS); this.isStreamInitialized = new AtomicBoolean(false); this.streamLock = new Object(); this.responseFutures = new ConcurrentHashMap<>(); this.requestQueue = new ConcurrentLinkedQueue<>(); // Create executors int corePoolSize = Runtime.getRuntime().availableProcessors(); this.responseProcessor = new ThreadPoolExecutor( corePoolSize, corePoolSize * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) ); this.streamMonitor = Executors.newSingleThreadScheduledExecutor(); // Start monitoring the stream initializeStreamMonitoring(); } private void initializeStreamMonitoring() { streamMonitor.scheduleAtFixedRate(() -> { try { if (!isStreamInitialized.get()) { reconnectStream(); } processQueuedRequests(); } catch (Exception e) { log.error("Stream monitoring error", e); } }, 0, 5, TimeUnit.SECONDS); } public CompletableFuture<Boolean> initializeStream(StreamInitRequest initRequest) { CompletableFuture<Boolean> initFuture = new CompletableFuture<>(); synchronized (streamLock) { if (isStreamInitialized.get()) { initFuture.complete(true); return initFuture; } try { StreamResponseObserver responseObserver = new StreamResponseObserver(initFuture); requestObserver = asyncStub.bidirectionalStream(responseObserver); // Send initialization request YourRequestType request = YourRequestType.newBuilder() .setInit(initRequest) .build(); requestObserver.onNext(request); } catch (Exception e) { log.error("Stream initialization failed", e); initFuture.completeExceptionally(e); } } return initFuture.orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS); } public CompletableFuture<YourResponseType> sendRequest(YourRequestType request) { // Validate stream is initialized if (!isStreamInitialized.get()) { return CompletableFuture.failedFuture( new IllegalStateException("Stream not initialized") ); } // Generate unique request ID String requestId = UUID.randomUUID().toString(); CompletableFuture<YourResponseType> responseFuture = new CompletableFuture<>(); try { // Acquire throttle permit with timeout if (!requestThrottle.tryAcquire(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { return CompletableFuture.failedFuture( new TimeoutException("Could not acquire request throttle") ); } // Store future for later correlation responseFutures.put(requestId, responseFuture); // Queue the request requestQueue.offer(request); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return CompletableFuture.failedFuture(e); } // Add timeout and cleanup return responseFuture .orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS) .whenComplete((response, throwable) -> { responseFutures.remove(requestId); requestThrottle.release(); }); } private void processQueuedRequests() { if (!isStreamInitialized.get()) return; YourRequestType request; while ((request = requestQueue.poll()) != null) { try { requestObserver.onNext(request); } catch (Exception e) { log.error("Failed to send queued request", e); reconnectStream(); break; } } } private void reconnectStream() { synchronized (streamLock) { if (isStreamInitialized.get()) return; log.warn("Attempting to reconnect stream"); // Close existing stream if it exists if (requestObserver != null) { try { requestObserver.onCompleted(); } catch (Exception e) { log.error("Error closing existing stream", e); } } // Fail all pending futures responseFutures.values().forEach(future -> future.completeExceptionally(new RuntimeException("Stream disconnected")) ); responseFutures.clear(); // Reinitialize stream // Note: You would typically call initializeStream with original parameters here } } private class StreamResponseObserver implements StreamObserver<YourResponseType> { private final CompletableFuture<Boolean> initFuture; public StreamResponseObserver(CompletableFuture<Boolean> initFuture) { this.initFuture = initFuture; } @Override public void onNext(YourResponseType response) { try { // Handle initialization response if (!isStreamInitialized.get() && isInitializationResponse(response)) { handleStreamInitialization(response); return; } // Correlate and complete response future correlateAndCompleteResponse(response); } catch (Exception e) { log.error("Error processing response", e); } } @Override public void onError(Throwable t) { log.error("Stream error", t); Status status = Status.fromThrowable(t); // Handle different error scenarios switch (status.getCode()) { case UNAVAILABLE: case CANCELLED: case DEADLINE_EXCEEDED: reconnectStream(); break; default: // Fail all futures for non-recoverable errors responseFutures.values().forEach(future -> future.completeExceptionally(t) ); responseFutures.clear(); } } @Override public void onCompleted() { log.info("Stream completed"); reconnectStream(); } private void handleStreamInitialization(YourResponseType response) { synchronized (streamLock) { if (isSuccessfulInitialization(response)) { isStreamInitialized.set(true); initFuture.complete(true); } else { initFuture.completeExceptionally( new RuntimeException("Stream initialization failed") ); } } } private void correlateAndCompleteResponse(YourResponseType response) { String requestId = extractRequestId(response); CompletableFuture<YourResponseType> future = responseFutures.get(requestId); if (future != null) { if (isSuccessfulResponse(response)) { future.complete(response); } else { future.completeExceptionally( new RuntimeException("Request failed: " + getErrorMessage(response)) ); } } } } @Override public void close() { try { // Shutdown stream and channel if (requestObserver != null) { requestObserver.onCompleted(); } channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); // Shutdown executors streamMonitor.shutdown(); responseProcessor.shutdown(); streamMonitor.awaitTermination(5, TimeUnit.SECONDS); responseProcessor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("Shutdown interrupted", e); } } // Utility methods to be implemented based on your specific response types private boolean isInitializationResponse(YourResponseType response) { // Implement logic to identify initialization response return false; } private boolean isSuccessfulInitialization(YourResponseType response) { // Implement logic to check successful initialization return false; } private boolean isSuccessfulResponse(YourResponseType response) { // Implement logic to check successful response return false; } private String extractRequestId(YourResponseType response) { // Implement logic to extract request ID return ""; } private String getErrorMessage(YourResponseType response) { // Implement logic to get error message return ""; } } ` I am not able to understand what is wrong with this code.After a few messages being processed susccessfully the stream suddenly errors out saying CLient close.Invalid wire in XML or Faled to read message. Is is something to do with race condition? Please help Tried to synchronously implement the above and it worked.I want to implement somekind of concurrency for request and response on the grpc client side. -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion visit https://groups.google.com/d/msgid/grpc-io/21e0602f-e008-4a9d-b69f-dd7d40b747e1n%40googlegroups.com.