Can you format the code so it is readable and provide the stacktrace?

On Thursday, January 23, 2025 at 11:39:21 AM UTC+5:30 Ankit Rathod wrote:

> `import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; 
> import io.grpc.Status; import io.grpc.stub.StreamObserver; import 
> java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; 
> import java.util.List; import java.util.UUID; import java.util.Map; import 
> org.slf4j.Logger; import org.slf4j.LoggerFactory;
>
> public class ImprovedGrpcClient implements AutoCloseable { private static 
> final Logger log = LoggerFactory.getLogger(ImprovedGrpcClient.class);
> // Configuration constants private static final int 
> MAX_CONCURRENT_REQUESTS = 10; private static final int 
> STREAM_TIMEOUT_SECONDS = 30; private static final int 
> RECONNECT_DELAY_SECONDS = 5; private static final int 
> MAX_RECONNECT_ATTEMPTS = 3; // gRPC and stream management private final 
> ManagedChannel channel; private final YourServiceGrpc.YourServiceStub 
> asyncStub; private volatile StreamObserver<YourRequestType> 
> requestObserver; // Concurrency controls private final Semaphore 
> requestThrottle; private final AtomicBoolean isStreamInitialized; private 
> final Object streamLock; // Request tracking private final Map<String, 
> CompletableFuture<YourResponseType>> responseFutures; private final 
> ConcurrentLinkedQueue<YourRequestType> requestQueue; // Executors private 
> final ScheduledExecutorService streamMonitor; private final ExecutorService 
> responseProcessor; public ImprovedGrpcClient(String host, int port) { // 
> Channel configuration with robust settings this.channel = 
> ManagedChannelBuilder.forAddress(host, port) .usePlaintext() // Use 
> .useTransportSecurity() in production .keepAliveTime(30, TimeUnit.SECONDS) 
> .keepAliveTimeout(10, TimeUnit.SECONDS) .maxInboundMessageSize(100 * 1024 * 
> 1024) .enableRetry() .build(); this.asyncStub = 
> YourServiceGrpc.newStub(channel); // Initialize concurrent controls 
> this.requestThrottle = new Semaphore(MAX_CONCURRENT_REQUESTS); 
> this.isStreamInitialized = new AtomicBoolean(false); this.streamLock = new 
> Object(); this.responseFutures = new ConcurrentHashMap<>(); 
> this.requestQueue = new ConcurrentLinkedQueue<>(); // Create executors int 
> corePoolSize = Runtime.getRuntime().availableProcessors(); 
> this.responseProcessor = new ThreadPoolExecutor( corePoolSize, corePoolSize 
> * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) ); 
> this.streamMonitor = Executors.newSingleThreadScheduledExecutor(); // Start 
> monitoring the stream initializeStreamMonitoring(); } private void 
> initializeStreamMonitoring() { streamMonitor.scheduleAtFixedRate(() -> { 
> try { if (!isStreamInitialized.get()) { reconnectStream(); } 
> processQueuedRequests(); } catch (Exception e) { log.error("Stream 
> monitoring error", e); } }, 0, 5, TimeUnit.SECONDS); } public 
> CompletableFuture<Boolean> initializeStream(StreamInitRequest initRequest) 
> { CompletableFuture<Boolean> initFuture = new CompletableFuture<>(); 
> synchronized (streamLock) { if (isStreamInitialized.get()) { 
> initFuture.complete(true); return initFuture; } try { 
> StreamResponseObserver responseObserver = new 
> StreamResponseObserver(initFuture); requestObserver = 
> asyncStub.bidirectionalStream(responseObserver); // Send initialization 
> request YourRequestType request = YourRequestType.newBuilder() 
> .setInit(initRequest) .build(); requestObserver.onNext(request); } catch 
> (Exception e) { log.error("Stream initialization failed", e); 
> initFuture.completeExceptionally(e); } } return 
> initFuture.orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS); } public 
> CompletableFuture<YourResponseType> sendRequest(YourRequestType request) { 
> // Validate stream is initialized if (!isStreamInitialized.get()) { return 
> CompletableFuture.failedFuture( new IllegalStateException("Stream not 
> initialized") ); } // Generate unique request ID String requestId = 
> UUID.randomUUID().toString(); CompletableFuture<YourResponseType> 
> responseFuture = new CompletableFuture<>(); try { // Acquire throttle 
> permit with timeout if (!requestThrottle.tryAcquire(STREAM_TIMEOUT_SECONDS, 
> TimeUnit.SECONDS)) { return CompletableFuture.failedFuture( new 
> TimeoutException("Could not acquire request throttle") ); } // Store future 
> for later correlation responseFutures.put(requestId, responseFuture); // 
> Queue the request requestQueue.offer(request); } catch 
> (InterruptedException e) { Thread.currentThread().interrupt(); return 
> CompletableFuture.failedFuture(e); } // Add timeout and cleanup return 
> responseFuture .orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS) 
> .whenComplete((response, throwable) -> { responseFutures.remove(requestId); 
> requestThrottle.release(); }); } private void processQueuedRequests() { if 
> (!isStreamInitialized.get()) return; YourRequestType request; while 
> ((request = requestQueue.poll()) != null) { try { 
> requestObserver.onNext(request); } catch (Exception e) { log.error("Failed 
> to send queued request", e); reconnectStream(); break; } } } private void 
> reconnectStream() { synchronized (streamLock) { if 
> (isStreamInitialized.get()) return; log.warn("Attempting to reconnect 
> stream"); // Close existing stream if it exists if (requestObserver != 
> null) { try { requestObserver.onCompleted(); } catch (Exception e) { 
> log.error("Error closing existing stream", e); } } // Fail all pending 
> futures responseFutures.values().forEach(future -> 
> future.completeExceptionally(new RuntimeException("Stream disconnected")) 
> ); responseFutures.clear(); // Reinitialize stream // Note: You would 
> typically call initializeStream with original parameters here } } private 
> class StreamResponseObserver implements StreamObserver<YourResponseType> { 
> private final CompletableFuture<Boolean> initFuture; public 
> StreamResponseObserver(CompletableFuture<Boolean> initFuture) { 
> this.initFuture = initFuture; } @Override public void 
> onNext(YourResponseType response) { try { // Handle initialization response 
> if (!isStreamInitialized.get() && isInitializationResponse(response)) { 
> handleStreamInitialization(response); return; } // Correlate and complete 
> response future correlateAndCompleteResponse(response); } catch (Exception 
> e) { log.error("Error processing response", e); } } @Override public void 
> onError(Throwable t) { log.error("Stream error", t); Status status = 
> Status.fromThrowable(t); // Handle different error scenarios switch 
> (status.getCode()) { case UNAVAILABLE: case CANCELLED: case 
> DEADLINE_EXCEEDED: reconnectStream(); break; default: // Fail all futures 
> for non-recoverable errors responseFutures.values().forEach(future -> 
> future.completeExceptionally(t) ); responseFutures.clear(); } } @Override 
> public void onCompleted() { log.info("Stream completed"); 
> reconnectStream(); } private void 
> handleStreamInitialization(YourResponseType response) { synchronized 
> (streamLock) { if (isSuccessfulInitialization(response)) { 
> isStreamInitialized.set(true); initFuture.complete(true); } else { 
> initFuture.completeExceptionally( new RuntimeException("Stream 
> initialization failed") ); } } } private void 
> correlateAndCompleteResponse(YourResponseType response) { String requestId 
> = extractRequestId(response); CompletableFuture<YourResponseType> future = 
> responseFutures.get(requestId); if (future != null) { if 
> (isSuccessfulResponse(response)) { future.complete(response); } else { 
> future.completeExceptionally( new RuntimeException("Request failed: " + 
> getErrorMessage(response)) ); } } } } @Override public void close() { try { 
> // Shutdown stream and channel if (requestObserver != null) { 
> requestObserver.onCompleted(); } channel.shutdown().awaitTermination(5, 
> TimeUnit.SECONDS); // Shutdown executors streamMonitor.shutdown(); 
> responseProcessor.shutdown(); streamMonitor.awaitTermination(5, 
> TimeUnit.SECONDS); responseProcessor.awaitTermination(5, TimeUnit.SECONDS); 
> } catch (InterruptedException e) { Thread.currentThread().interrupt(); 
> log.error("Shutdown interrupted", e); } } // Utility methods to be 
> implemented based on your specific response types private boolean 
> isInitializationResponse(YourResponseType response) { // Implement logic to 
> identify initialization response return false; } private boolean 
> isSuccessfulInitialization(YourResponseType response) { // Implement logic 
> to check successful initialization return false; } private boolean 
> isSuccessfulResponse(YourResponseType response) { // Implement logic to 
> check successful response return false; } private String 
> extractRequestId(YourResponseType response) { // Implement logic to extract 
> request ID return ""; } private String getErrorMessage(YourResponseType 
> response) { // Implement logic to get error message return ""; } } `
>
> I am not able to understand what is wrong with this code.After a few 
> messages being processed susccessfully the stream suddenly errors out 
> saying CLient close.Invalid wire in XML or Faled to read message. Is is 
> something to do with race condition? Please help
>
> Tried to synchronously implement the above and it worked.I want to 
> implement somekind of concurrency for request and response on the grpc 
> client side.
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion visit 
https://groups.google.com/d/msgid/grpc-io/e166c8d0-7509-426d-bbbd-983a165cc953n%40googlegroups.com.

Reply via email to