`import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; 
import io.grpc.Status; import io.grpc.stub.StreamObserver; import 
java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; 
import java.util.List; import java.util.UUID; import java.util.Map; import 
org.slf4j.Logger; import org.slf4j.LoggerFactory;

public class ImprovedGrpcClient implements AutoCloseable { private static 
final Logger log = LoggerFactory.getLogger(ImprovedGrpcClient.class);
// Configuration constants private static final int MAX_CONCURRENT_REQUESTS 
= 10; private static final int STREAM_TIMEOUT_SECONDS = 30; private static 
final int RECONNECT_DELAY_SECONDS = 5; private static final int 
MAX_RECONNECT_ATTEMPTS = 3; // gRPC and stream management private final 
ManagedChannel channel; private final YourServiceGrpc.YourServiceStub 
asyncStub; private volatile StreamObserver<YourRequestType> 
requestObserver; // Concurrency controls private final Semaphore 
requestThrottle; private final AtomicBoolean isStreamInitialized; private 
final Object streamLock; // Request tracking private final Map<String, 
CompletableFuture<YourResponseType>> responseFutures; private final 
ConcurrentLinkedQueue<YourRequestType> requestQueue; // Executors private 
final ScheduledExecutorService streamMonitor; private final ExecutorService 
responseProcessor; public ImprovedGrpcClient(String host, int port) { // 
Channel configuration with robust settings this.channel = 
ManagedChannelBuilder.forAddress(host, port) .usePlaintext() // Use 
.useTransportSecurity() in production .keepAliveTime(30, TimeUnit.SECONDS) 
.keepAliveTimeout(10, TimeUnit.SECONDS) .maxInboundMessageSize(100 * 1024 * 
1024) .enableRetry() .build(); this.asyncStub = 
YourServiceGrpc.newStub(channel); // Initialize concurrent controls 
this.requestThrottle = new Semaphore(MAX_CONCURRENT_REQUESTS); 
this.isStreamInitialized = new AtomicBoolean(false); this.streamLock = new 
Object(); this.responseFutures = new ConcurrentHashMap<>(); 
this.requestQueue = new ConcurrentLinkedQueue<>(); // Create executors int 
corePoolSize = Runtime.getRuntime().availableProcessors(); 
this.responseProcessor = new ThreadPoolExecutor( corePoolSize, corePoolSize 
* 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) ); 
this.streamMonitor = Executors.newSingleThreadScheduledExecutor(); // Start 
monitoring the stream initializeStreamMonitoring(); } private void 
initializeStreamMonitoring() { streamMonitor.scheduleAtFixedRate(() -> { 
try { if (!isStreamInitialized.get()) { reconnectStream(); } 
processQueuedRequests(); } catch (Exception e) { log.error("Stream 
monitoring error", e); } }, 0, 5, TimeUnit.SECONDS); } public 
CompletableFuture<Boolean> initializeStream(StreamInitRequest initRequest) 
{ CompletableFuture<Boolean> initFuture = new CompletableFuture<>(); 
synchronized (streamLock) { if (isStreamInitialized.get()) { 
initFuture.complete(true); return initFuture; } try { 
StreamResponseObserver responseObserver = new 
StreamResponseObserver(initFuture); requestObserver = 
asyncStub.bidirectionalStream(responseObserver); // Send initialization 
request YourRequestType request = YourRequestType.newBuilder() 
.setInit(initRequest) .build(); requestObserver.onNext(request); } catch 
(Exception e) { log.error("Stream initialization failed", e); 
initFuture.completeExceptionally(e); } } return 
initFuture.orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS); } public 
CompletableFuture<YourResponseType> sendRequest(YourRequestType request) { 
// Validate stream is initialized if (!isStreamInitialized.get()) { return 
CompletableFuture.failedFuture( new IllegalStateException("Stream not 
initialized") ); } // Generate unique request ID String requestId = 
UUID.randomUUID().toString(); CompletableFuture<YourResponseType> 
responseFuture = new CompletableFuture<>(); try { // Acquire throttle 
permit with timeout if (!requestThrottle.tryAcquire(STREAM_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) { return CompletableFuture.failedFuture( new 
TimeoutException("Could not acquire request throttle") ); } // Store future 
for later correlation responseFutures.put(requestId, responseFuture); // 
Queue the request requestQueue.offer(request); } catch 
(InterruptedException e) { Thread.currentThread().interrupt(); return 
CompletableFuture.failedFuture(e); } // Add timeout and cleanup return 
responseFuture .orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS) 
.whenComplete((response, throwable) -> { responseFutures.remove(requestId); 
requestThrottle.release(); }); } private void processQueuedRequests() { if 
(!isStreamInitialized.get()) return; YourRequestType request; while 
((request = requestQueue.poll()) != null) { try { 
requestObserver.onNext(request); } catch (Exception e) { log.error("Failed 
to send queued request", e); reconnectStream(); break; } } } private void 
reconnectStream() { synchronized (streamLock) { if 
(isStreamInitialized.get()) return; log.warn("Attempting to reconnect 
stream"); // Close existing stream if it exists if (requestObserver != 
null) { try { requestObserver.onCompleted(); } catch (Exception e) { 
log.error("Error closing existing stream", e); } } // Fail all pending 
futures responseFutures.values().forEach(future -> 
future.completeExceptionally(new RuntimeException("Stream disconnected")) 
); responseFutures.clear(); // Reinitialize stream // Note: You would 
typically call initializeStream with original parameters here } } private 
class StreamResponseObserver implements StreamObserver<YourResponseType> { 
private final CompletableFuture<Boolean> initFuture; public 
StreamResponseObserver(CompletableFuture<Boolean> initFuture) { 
this.initFuture = initFuture; } @Override public void 
onNext(YourResponseType response) { try { // Handle initialization response 
if (!isStreamInitialized.get() && isInitializationResponse(response)) { 
handleStreamInitialization(response); return; } // Correlate and complete 
response future correlateAndCompleteResponse(response); } catch (Exception 
e) { log.error("Error processing response", e); } } @Override public void 
onError(Throwable t) { log.error("Stream error", t); Status status = 
Status.fromThrowable(t); // Handle different error scenarios switch 
(status.getCode()) { case UNAVAILABLE: case CANCELLED: case 
DEADLINE_EXCEEDED: reconnectStream(); break; default: // Fail all futures 
for non-recoverable errors responseFutures.values().forEach(future -> 
future.completeExceptionally(t) ); responseFutures.clear(); } } @Override 
public void onCompleted() { log.info("Stream completed"); 
reconnectStream(); } private void 
handleStreamInitialization(YourResponseType response) { synchronized 
(streamLock) { if (isSuccessfulInitialization(response)) { 
isStreamInitialized.set(true); initFuture.complete(true); } else { 
initFuture.completeExceptionally( new RuntimeException("Stream 
initialization failed") ); } } } private void 
correlateAndCompleteResponse(YourResponseType response) { String requestId 
= extractRequestId(response); CompletableFuture<YourResponseType> future = 
responseFutures.get(requestId); if (future != null) { if 
(isSuccessfulResponse(response)) { future.complete(response); } else { 
future.completeExceptionally( new RuntimeException("Request failed: " + 
getErrorMessage(response)) ); } } } } @Override public void close() { try { 
// Shutdown stream and channel if (requestObserver != null) { 
requestObserver.onCompleted(); } channel.shutdown().awaitTermination(5, 
TimeUnit.SECONDS); // Shutdown executors streamMonitor.shutdown(); 
responseProcessor.shutdown(); streamMonitor.awaitTermination(5, 
TimeUnit.SECONDS); responseProcessor.awaitTermination(5, TimeUnit.SECONDS); 
} catch (InterruptedException e) { Thread.currentThread().interrupt(); 
log.error("Shutdown interrupted", e); } } // Utility methods to be 
implemented based on your specific response types private boolean 
isInitializationResponse(YourResponseType response) { // Implement logic to 
identify initialization response return false; } private boolean 
isSuccessfulInitialization(YourResponseType response) { // Implement logic 
to check successful initialization return false; } private boolean 
isSuccessfulResponse(YourResponseType response) { // Implement logic to 
check successful response return false; } private String 
extractRequestId(YourResponseType response) { // Implement logic to extract 
request ID return ""; } private String getErrorMessage(YourResponseType 
response) { // Implement logic to get error message return ""; } } `

I am not able to understand what is wrong with this code.After a few 
messages being processed susccessfully the stream suddenly errors out 
saying CLient close.Invalid wire in XML or Faled to read message. Is is 
something to do with race condition? Please help

Tried to synchronously implement the above and it worked.I want to 
implement somekind of concurrency for request and response on the grpc 
client side.

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion visit 
https://groups.google.com/d/msgid/grpc-io/21e0602f-e008-4a9d-b69f-dd7d40b747e1n%40googlegroups.com.

Reply via email to