rkhachatryan commented on a change in pull request #11541:
URL: https://github.com/apache/flink/pull/11541#discussion_r442982508



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -60,49 +71,41 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
                while (client == null) {
                        entry = clients.get(connectionId);
 
-                       if (entry != null) {
-                               // Existing channel or connecting channel
-                               if (entry instanceof 
NettyPartitionRequestClient) {
-                                       client = (NettyPartitionRequestClient) 
entry;
-                               }
-                               else {
-                                       ConnectingChannel future = 
(ConnectingChannel) entry;
-                                       client = future.waitForChannel();
-
-                                       clients.replace(connectionId, future, 
client);
-                               }
-                       }
-                       else {
-                               // No channel yet. Create one, but watch out 
for a race.
-                               // We create a "connecting future" and 
atomically add it to the map.
-                               // Only the thread that really added it 
establishes the channel.
-                               // The others need to wait on that original 
establisher's future.
-                               ConnectingChannel connectingChannel = new 
ConnectingChannel(connectionId, this);
-                               Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
-
-                               if (old == null) {
-                                       
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
-                                       client = 
connectingChannel.waitForChannel();
+                       synchronized (connectionId) {

Review comment:
       I think it's not safe to synchronize on this because it's passed from 
the outside.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -60,49 +71,41 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
                while (client == null) {
                        entry = clients.get(connectionId);

Review comment:
       I found it to be the cause of deadlock:
   1. entry is placed by A
   1. entry is read by B
   1. A gets the connection and replaces entry - but A already holds an old 
entry
   1. B acquires lock and waits for channel indefinitely




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to