Can you format the code so it is readable and provide the stacktrace? On Thursday, January 23, 2025 at 11:39:21 AM UTC+5:30 Ankit Rathod wrote:
> `import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; > import io.grpc.Status; import io.grpc.stub.StreamObserver; import > java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; > import java.util.List; import java.util.UUID; import java.util.Map; import > org.slf4j.Logger; import org.slf4j.LoggerFactory; > > public class ImprovedGrpcClient implements AutoCloseable { private static > final Logger log = LoggerFactory.getLogger(ImprovedGrpcClient.class); > // Configuration constants private static final int > MAX_CONCURRENT_REQUESTS = 10; private static final int > STREAM_TIMEOUT_SECONDS = 30; private static final int > RECONNECT_DELAY_SECONDS = 5; private static final int > MAX_RECONNECT_ATTEMPTS = 3; // gRPC and stream management private final > ManagedChannel channel; private final YourServiceGrpc.YourServiceStub > asyncStub; private volatile StreamObserver<YourRequestType> > requestObserver; // Concurrency controls private final Semaphore > requestThrottle; private final AtomicBoolean isStreamInitialized; private > final Object streamLock; // Request tracking private final Map<String, > CompletableFuture<YourResponseType>> responseFutures; private final > ConcurrentLinkedQueue<YourRequestType> requestQueue; // Executors private > final ScheduledExecutorService streamMonitor; private final ExecutorService > responseProcessor; public ImprovedGrpcClient(String host, int port) { // > Channel configuration with robust settings this.channel = > ManagedChannelBuilder.forAddress(host, port) .usePlaintext() // Use > .useTransportSecurity() in production .keepAliveTime(30, TimeUnit.SECONDS) > .keepAliveTimeout(10, TimeUnit.SECONDS) .maxInboundMessageSize(100 * 1024 * > 1024) .enableRetry() .build(); this.asyncStub = > YourServiceGrpc.newStub(channel); // Initialize concurrent controls > this.requestThrottle = new Semaphore(MAX_CONCURRENT_REQUESTS); > this.isStreamInitialized = new AtomicBoolean(false); this.streamLock = new > Object(); this.responseFutures = new ConcurrentHashMap<>(); > this.requestQueue = new ConcurrentLinkedQueue<>(); // Create executors int > corePoolSize = Runtime.getRuntime().availableProcessors(); > this.responseProcessor = new ThreadPoolExecutor( corePoolSize, corePoolSize > * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) ); > this.streamMonitor = Executors.newSingleThreadScheduledExecutor(); // Start > monitoring the stream initializeStreamMonitoring(); } private void > initializeStreamMonitoring() { streamMonitor.scheduleAtFixedRate(() -> { > try { if (!isStreamInitialized.get()) { reconnectStream(); } > processQueuedRequests(); } catch (Exception e) { log.error("Stream > monitoring error", e); } }, 0, 5, TimeUnit.SECONDS); } public > CompletableFuture<Boolean> initializeStream(StreamInitRequest initRequest) > { CompletableFuture<Boolean> initFuture = new CompletableFuture<>(); > synchronized (streamLock) { if (isStreamInitialized.get()) { > initFuture.complete(true); return initFuture; } try { > StreamResponseObserver responseObserver = new > StreamResponseObserver(initFuture); requestObserver = > asyncStub.bidirectionalStream(responseObserver); // Send initialization > request YourRequestType request = YourRequestType.newBuilder() > .setInit(initRequest) .build(); requestObserver.onNext(request); } catch > (Exception e) { log.error("Stream initialization failed", e); > initFuture.completeExceptionally(e); } } return > initFuture.orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS); } public > CompletableFuture<YourResponseType> sendRequest(YourRequestType request) { > // Validate stream is initialized if (!isStreamInitialized.get()) { return > CompletableFuture.failedFuture( new IllegalStateException("Stream not > initialized") ); } // Generate unique request ID String requestId = > UUID.randomUUID().toString(); CompletableFuture<YourResponseType> > responseFuture = new CompletableFuture<>(); try { // Acquire throttle > permit with timeout if (!requestThrottle.tryAcquire(STREAM_TIMEOUT_SECONDS, > TimeUnit.SECONDS)) { return CompletableFuture.failedFuture( new > TimeoutException("Could not acquire request throttle") ); } // Store future > for later correlation responseFutures.put(requestId, responseFuture); // > Queue the request requestQueue.offer(request); } catch > (InterruptedException e) { Thread.currentThread().interrupt(); return > CompletableFuture.failedFuture(e); } // Add timeout and cleanup return > responseFuture .orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS) > .whenComplete((response, throwable) -> { responseFutures.remove(requestId); > requestThrottle.release(); }); } private void processQueuedRequests() { if > (!isStreamInitialized.get()) return; YourRequestType request; while > ((request = requestQueue.poll()) != null) { try { > requestObserver.onNext(request); } catch (Exception e) { log.error("Failed > to send queued request", e); reconnectStream(); break; } } } private void > reconnectStream() { synchronized (streamLock) { if > (isStreamInitialized.get()) return; log.warn("Attempting to reconnect > stream"); // Close existing stream if it exists if (requestObserver != > null) { try { requestObserver.onCompleted(); } catch (Exception e) { > log.error("Error closing existing stream", e); } } // Fail all pending > futures responseFutures.values().forEach(future -> > future.completeExceptionally(new RuntimeException("Stream disconnected")) > ); responseFutures.clear(); // Reinitialize stream // Note: You would > typically call initializeStream with original parameters here } } private > class StreamResponseObserver implements StreamObserver<YourResponseType> { > private final CompletableFuture<Boolean> initFuture; public > StreamResponseObserver(CompletableFuture<Boolean> initFuture) { > this.initFuture = initFuture; } @Override public void > onNext(YourResponseType response) { try { // Handle initialization response > if (!isStreamInitialized.get() && isInitializationResponse(response)) { > handleStreamInitialization(response); return; } // Correlate and complete > response future correlateAndCompleteResponse(response); } catch (Exception > e) { log.error("Error processing response", e); } } @Override public void > onError(Throwable t) { log.error("Stream error", t); Status status = > Status.fromThrowable(t); // Handle different error scenarios switch > (status.getCode()) { case UNAVAILABLE: case CANCELLED: case > DEADLINE_EXCEEDED: reconnectStream(); break; default: // Fail all futures > for non-recoverable errors responseFutures.values().forEach(future -> > future.completeExceptionally(t) ); responseFutures.clear(); } } @Override > public void onCompleted() { log.info("Stream completed"); > reconnectStream(); } private void > handleStreamInitialization(YourResponseType response) { synchronized > (streamLock) { if (isSuccessfulInitialization(response)) { > isStreamInitialized.set(true); initFuture.complete(true); } else { > initFuture.completeExceptionally( new RuntimeException("Stream > initialization failed") ); } } } private void > correlateAndCompleteResponse(YourResponseType response) { String requestId > = extractRequestId(response); CompletableFuture<YourResponseType> future = > responseFutures.get(requestId); if (future != null) { if > (isSuccessfulResponse(response)) { future.complete(response); } else { > future.completeExceptionally( new RuntimeException("Request failed: " + > getErrorMessage(response)) ); } } } } @Override public void close() { try { > // Shutdown stream and channel if (requestObserver != null) { > requestObserver.onCompleted(); } channel.shutdown().awaitTermination(5, > TimeUnit.SECONDS); // Shutdown executors streamMonitor.shutdown(); > responseProcessor.shutdown(); streamMonitor.awaitTermination(5, > TimeUnit.SECONDS); responseProcessor.awaitTermination(5, TimeUnit.SECONDS); > } catch (InterruptedException e) { Thread.currentThread().interrupt(); > log.error("Shutdown interrupted", e); } } // Utility methods to be > implemented based on your specific response types private boolean > isInitializationResponse(YourResponseType response) { // Implement logic to > identify initialization response return false; } private boolean > isSuccessfulInitialization(YourResponseType response) { // Implement logic > to check successful initialization return false; } private boolean > isSuccessfulResponse(YourResponseType response) { // Implement logic to > check successful response return false; } private String > extractRequestId(YourResponseType response) { // Implement logic to extract > request ID return ""; } private String getErrorMessage(YourResponseType > response) { // Implement logic to get error message return ""; } } ` > > I am not able to understand what is wrong with this code.After a few > messages being processed susccessfully the stream suddenly errors out > saying CLient close.Invalid wire in XML or Faled to read message. Is is > something to do with race condition? Please help > > Tried to synchronously implement the above and it worked.I want to > implement somekind of concurrency for request and response on the grpc > client side. > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion visit https://groups.google.com/d/msgid/grpc-io/e166c8d0-7509-426d-bbbd-983a165cc953n%40googlegroups.com.