ptlrs commented on code in PR #9718:
URL: https://github.com/apache/ozone/pull/9718#discussion_r2789069853
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -152,34 +155,56 @@ public XceiverClientGrpc(Pipeline pipeline,
ConfigurationSource config) {
public void connect() throws Exception {
// connect to the closest node, if closest node doesn't exist, delegate to
// first node, which is usually the leader in the pipeline.
- DatanodeDetails dn = topologyAwareRead ? this.pipeline.getClosestNode() :
- this.pipeline.getFirstNode();
+ DatanodeDetails dn = topologyAwareRead
+ ? this.pipeline.getClosestNode()
+ : this.pipeline.getFirstNode();
// just make a connection to the picked datanode at the beginning
connectToDatanode(dn);
}
- private synchronized void connectToDatanode(DatanodeDetails dn)
+ private void connectToDatanode(DatanodeDetails dn)
throws IOException {
+ if (isClosed.get()) {
+ throw new IOException("Client is closed.");
+ }
+
if (isConnected(dn)) {
return;
}
- // read port from the data node, on failure use default configured
- // port.
+ // read port from the data node, on failure use default configured port
int port = dn.getStandalonePort().getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}
+ final int finalPort = port;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ",
- dn, pipeline.getNodes());
+ LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
+
+ channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
+ if (channel.isTerminated() || channel.isShutdown()) {
+ asyncStubs.remove(dnId);
+ return null; // removes from channels map
+ }
+
+ return channel;
+ });
+
+ ManagedChannel channel;
+ try {
+ channel = channels.computeIfAbsent(dn.getID(), dnId -> {
+ try {
+ return createChannel(dn, finalPort).build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (RuntimeException e) {
+ LOG.error("Failed to create channel to datanode {}", dn, e);
+ throw new IOException(e.getCause());
}
- ManagedChannel channel = createChannel(dn, port).build();
- XceiverClientProtocolServiceStub asyncStub =
- XceiverClientProtocolServiceGrpc.newStub(channel);
- asyncStubs.put(dn.getID(), asyncStub);
- channels.put(dn.getID(), channel);
+
+ asyncStubs.computeIfAbsent(dn.getID(), dnId ->
XceiverClientProtocolServiceGrpc.newStub(channel));
Review Comment:
The async stub is being cleared above. Won't that be sufficient?
```java
channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
if (channel.isTerminated() || channel.isShutdown()) {
asyncStubs.remove(dnId); // clearing the stubs
here
return null; // removes from channels map
}
return channel;
});
```
https://github.com/apache/ozone/pull/9718/changes#diff-bb4105305530e6076ed3af7c2463508e69d9bb02d146762dbc505d920734134bR184-R191
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]