[ 
https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851138#comment-17851138
 ] 

ASF GitHub Bot commented on HADOOP-19120:
-----------------------------------------

steveloughran commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1621368582


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -167,5 +167,14 @@ public final class FileSystemConfigurations {
   public static final int HUNDRED = 100;
   public static final long THOUSAND = 1000L;
 
+  public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
+      = HttpOperationType.APACHE_HTTP_CLIENT;
+
+  public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES 
= 3;
+
+  public static final long DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME = 5_000L;
+
+  public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_CONNECTIONS = 5;

Review Comment:
   that's quite a low number, isn't it?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+
+public class AbfsApacheHttpExpect100Exception extends IOException {
+  private final HttpResponse httpResponse;
+
+  public AbfsApacheHttpExpect100Exception(final String s, final HttpResponse 
httpResponse) {
+    super(s);
+    this.httpResponse = httpResponse;

Review Comment:
   pull this into the proposed superclass, add a requireNonNull()



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(
+        socketFactoryRegistry, null, null);
+  }
+
+  @Override
+  public ConnectionRequest requestConnection(final HttpRoute route,
+      final Object state) {
+    return new ConnectionRequest() {

Review Comment:
   log at debug that a connection was requested



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -316,5 +316,10 @@ public static String accountProperty(String property, 
String account) {
    * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
    */
   public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = 
"fs.azure.buffered.pread.disable";
+  /**Defines what network library to use for server IO calls {@value }*/
+  public static final String FS_AZURE_NETWORKING_LIBRARY = 
"fs.azure.networking.library";
+  public static final String 
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 
"fs.azure.apache.http.client.max.io.exception.retries";

Review Comment:
   these all need javadocs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -95,11 +98,15 @@ public class AbfsRestOperation {
   private String failureReason;
   private AbfsRetryPolicy retryPolicy;
 
+  private final AbfsConfiguration abfsConfiguration;
+
   /**
    * This variable stores the tracing context used for last Rest Operation.
    */
   private TracingContext lastUsedTracingContext;
 
+  private int apacheHttpClientIoExceptions = 0;

Review Comment:
   javadocs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpClientContext.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.client.protocol.HttpClientContext;
+
+public class AbfsManagedHttpClientContext extends HttpClientContext {

Review Comment:
   javadocs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable 
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit 
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults 
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to 
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can 
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager. 
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
+    extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
+    implements Runnable {
+
+  private int maxConn;
+
+  private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+
+  private Thread keepAliveTimer = null;
+
+  private boolean isPaused = false;
+
+  private KeepAliveCache() {
+    setMaxConn();
+  }
+
+  synchronized void pauseThread() {
+    isPaused = true;
+  }
+
+  synchronized void resumeThread() {
+    isPaused = false;
+    notify();
+  }
+
+  private void setMaxConn() {
+    String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+    if (sysPropMaxConn == null) {
+      maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+    } else {
+      maxConn = Integer.parseInt(sysPropMaxConn);
+    }
+  }
+
+  public void setAbfsConfig(AbfsConfiguration abfsConfiguration) {
+    this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+    this.connectionIdleTTL = 
abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+  }
+
+  public long getConnectionIdleTTL() {
+    return connectionIdleTTL;
+  }
+
+  private static final KeepAliveCache INSTANCE = new KeepAliveCache();
+
+  public static KeepAliveCache getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  void clearThread() {
+    clear();
+    setMaxConn();
+  }
+
+  private int getKacSize() {
+    return INSTANCE.maxConn;
+  }
+
+  @Override
+  public void run() {
+    do {
+      synchronized (this) {
+        while (isPaused) {
+          try {
+            wait();
+          } catch (InterruptedException ignored) {
+          }
+        }
+      }
+      kacCleanup();
+    } while (size() > 0);
+  }
+
+  private void kacCleanup() {
+    try {
+      Thread.sleep(connectionIdleTTL);
+    } catch (InterruptedException ex) {
+      return;
+    }
+    synchronized (this) {
+      long currentTime = System.currentTimeMillis();
+
+      ArrayList<KeepAliveKey> keysToRemove
+          = new ArrayList<KeepAliveKey>();
+
+      for (Map.Entry<KeepAliveKey, ClientVector> entry : entrySet()) {
+        KeepAliveKey key = entry.getKey();
+        ClientVector v = entry.getValue();
+        synchronized (v) {
+          int i;
+
+          for (i = 0; i < v.size(); i++) {
+            KeepAliveEntry e = v.elementAt(i);
+            if ((currentTime - e.idleStartTime) > v.nap
+                || e.httpClientConnection.isStale()) {
+              HttpClientConnection hc = e.httpClientConnection;
+              closeHtpClientConnection(hc);
+            } else {
+              break;
+            }
+          }
+          v.subList(0, i).clear();
+
+          if (v.size() == 0) {
+            keysToRemove.add(key);
+          }
+        }
+      }
+
+      for (KeepAliveKey key : keysToRemove) {
+        removeVector(key);
+      }
+    }
+  }
+
+  synchronized void removeVector(KeepAliveKey k) {
+    super.remove(k);
+  }
+
+  public synchronized void put(final HttpRoute httpRoute,
+      final HttpClientConnection httpClientConnection) {
+    boolean startThread = (keepAliveTimer == null);
+    if (!startThread) {
+      if (!keepAliveTimer.isAlive()) {
+        startThread = true;
+      }
+    }
+    if (startThread) {
+      clear();
+      final KeepAliveCache cache = this;
+      ThreadGroup grp = Thread.currentThread().getThreadGroup();
+      ThreadGroup parent = null;
+      while ((parent = grp.getParent()) != null) {
+        grp = parent;
+      }
+
+      keepAliveTimer = new Thread(grp, cache, "Keep-Alive-Timer");

Review Comment:
   OK, this is line-for-line the same as the JDK. It doesn't matter what 
license microsoft has with oracle, nobody is allowed to paste bits from the JDK 
in.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(parseStatusCode(httpResponse));
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",

Review Comment:
   only do this if the log is at debug, otherwise getResponseHeaders is doing 
needless work



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
+  public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";
+  public static final Integer DEFAULT_MAX_CONN_SYS_PROP = 5;

Review Comment:
   i know it's the jdk default, but it's pretty awful. Why not increasse?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using

Review Comment:
   needs javadocs for each non-inherited method and field.
   just assume that I *always* expect these and save review time by adding them



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;

Review Comment:
   add a little javadoc for these fields



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
+  public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";
+  public static final Integer DEFAULT_MAX_CONN_SYS_PROP = 5;
+  public static final int KAC_DEFAULT_CONN_TTL = 5_000;

Review Comment:
   going to prefer using a Duration.ofSeconds(5) and extracting the millis 
value later...removes ambiguity of unit



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(parseStatusCode(httpResponse));
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+        getResponseHeaders(httpResponse));
+    parseResponse(buffer, offset, length);
+  }
+
+  @VisibleForTesting
+  int parseStatusCode(HttpResponse httpResponse) {
+    return httpResponse.getStatusLine().getStatusCode();
+  }
+
+  @VisibleForTesting
+  HttpResponse executeRequest() throws IOException {
+    AbfsManagedHttpClientContext abfsHttpClientContext
+        = setFinalAbfsClientContext();
+    HttpResponse response
+        = AbfsApacheHttpClient.getClient().execute(httpRequestBase,
+        abfsHttpClientContext, getConnectionTimeout(), getReadTimeout());
+    setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
+    setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
+    setRecvResponseTimeMs(abfsHttpClientContext.getReadTime());
+    return response;
+  }
+
+  private Map<String, List<String>> getResponseHeaders(final HttpResponse 
httpResponse) {
+    if (httpResponse == null || httpResponse.getAllHeaders() == null) {
+      return new HashMap<>();
+    }
+    Map<String, List<String>> map = new HashMap<>();
+    for (Header header : httpResponse.getAllHeaders()) {
+      map.put(header.getName(), new ArrayList<String>(
+          Collections.singleton(header.getValue())));
+    }
+    return map;
+  }
+
+  @Override
+  public void setRequestProperty(final String key, final String value) {
+    setHeader(key, value);
+  }
+
+  @Override
+  Map<String, List<String>> getRequestProperties() {
+    Map<String, List<String>> map = new HashMap<>();
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      map.put(header.getName(),
+          new ArrayList<String>() {{
+            add(header.getValue());
+          }});
+    }
+    return map;
+  }
+
+  @Override
+  public String getResponseHeader(final String headerName) {
+    if (httpResponse == null) {
+      return null;
+    }
+    Header header = httpResponse.getFirstHeader(headerName);
+    if (header != null) {
+      return header.getValue();
+    }
+    return null;
+  }
+
+  @Override
+  InputStream getContentInputStream()
+      throws IOException {
+    if (httpResponse == null || httpResponse.getEntity() == null) {
+      return null;
+    }
+    return httpResponse.getEntity().getContent();
+  }
+
+  public void sendPayload(final byte[] buffer,
+      final int offset,
+      final int length)
+      throws IOException {
+    if (!isPayloadRequest) {
+      return;
+    }
+
+    switch (getMethod()) {
+    case HTTP_METHOD_PUT:
+      httpRequestBase = new HttpPut(getUri());
+      break;
+    case HTTP_METHOD_PATCH:
+      httpRequestBase = new HttpPatch(getUri());
+      break;
+    case HTTP_METHOD_POST:
+      httpRequestBase = new HttpPost(getUri());
+      break;
+    default:
+      /*
+       * This should never happen as the method is already validated in
+       * isPayloadRequest.
+       */
+      return;
+    }
+
+    setExpectedBytesToBeSent(length);
+    if (buffer != null) {
+      HttpEntity httpEntity = new ByteArrayEntity(buffer, offset, length,
+          TEXT_PLAIN);
+      ((HttpEntityEnclosingRequestBase) httpRequestBase).setEntity(
+          httpEntity);
+    }
+
+    translateHeaders(httpRequestBase, getRequestHeaders());
+    try {
+      httpResponse = executeRequest();
+    } catch (AbfsApacheHttpExpect100Exception ex) {
+      LOG.debug(
+          "Getting output stream failed with expect header enabled, returning 
back."
+              + "Expect 100 assertion failed for uri {} with status code: {}",
+          getMaskedUrl(), parseStatusCode(ex.getHttpResponse()),
+          ex);
+      connectionDisconnectedOnError = true;
+      httpResponse = ex.getHttpResponse();
+    } finally {
+      if (!connectionDisconnectedOnError
+          && httpRequestBase instanceof HttpEntityEnclosingRequestBase) {
+        setBytesSent(length);
+      }
+    }
+  }
+
+  private void prepareRequest() throws IOException {
+    switch (getMethod()) {
+    case HTTP_METHOD_GET:
+      httpRequestBase = new HttpGet(getUri());
+      break;
+    case HTTP_METHOD_DELETE:
+      httpRequestBase = new HttpDelete(getUri());
+      break;
+    case HTTP_METHOD_HEAD:
+      httpRequestBase = new HttpHead(getUri());
+      break;
+    default:
+      /*
+       * This would not happen as the AbfsClient would always be sending valid
+       * method.
+       */
+      throw new PathIOException(getUrl().toString(),
+          "Unsupported HTTP method: " + getMethod());
+    }
+    translateHeaders(httpRequestBase, getRequestHeaders());
+  }
+
+  private URI getUri() throws IOException {
+    try {
+      return getUrl().toURI();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void translateHeaders(final HttpRequestBase httpRequestBase,
+      final List<AbfsHttpHeader> requestHeaders) {
+    for (AbfsHttpHeader header : requestHeaders) {
+      httpRequestBase.setHeader(header.getName(), header.getValue());
+    }
+  }
+
+  private void setHeader(String name, String val) {
+    addHeaderToRequestHeaderList(new AbfsHttpHeader(name, val));
+  }
+
+  @Override
+  public String getRequestProperty(String name) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(name)) {
+        return header.getValue();
+      }
+    }
+    return EMPTY_STRING;
+  }
+
+  @Override
+  boolean getConnectionDisconnectedOnError() {
+    return connectionDisconnectedOnError;
+  }
+
+  @Override
+  public String getTracingContextSuffix() {
+    return APACHE_IMPL;
+  }
+
+  public String getClientRequestId() {
+    for (AbfsHttpHeader header : getRequestHeaders()) {

Review Comment:
   replace with `getRequestProperty(X_MS_CLIENT_REQUEST_ID)`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+
+public class AbfsApacheHttpExpect100Exception extends IOException {

Review Comment:
   How about a generic HttpResponseException which this then subclasses?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTP_SCHEME;
+import static 
org.apache.http.conn.ssl.SSLConnectionSocketFactory.getDefaultHostnameVerifier;
+
+final class AbfsApacheHttpClient {

Review Comment:
   guess what I'm going to stay about javadocs here?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
+  public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";

Review Comment:
   add javadoc to explain why this value is a sysprop (I know, but others may 
not)



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(parseStatusCode(httpResponse));
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+        getResponseHeaders(httpResponse));
+    parseResponse(buffer, offset, length);
+  }
+
+  @VisibleForTesting
+  int parseStatusCode(HttpResponse httpResponse) {
+    return httpResponse.getStatusLine().getStatusCode();
+  }
+
+  @VisibleForTesting
+  HttpResponse executeRequest() throws IOException {
+    AbfsManagedHttpClientContext abfsHttpClientContext
+        = setFinalAbfsClientContext();
+    HttpResponse response

Review Comment:
   recommend logging at debug that this is about to be called



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);

Review Comment:
   log at debug that this will be used, helps to track down what is going on 
when there are network issues



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls 
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private final boolean isPayloadRequest;
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final int connectionTimeout,
+      final int readTimeout) {
+    super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpClientContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : getRequestHeaders()) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,

Review Comment:
   add @Override



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1217,7 +1226,7 @@ public AbfsRestOperation deletePath(final String path, 
final boolean recursive,
             this,
             HTTP_METHOD_DELETE,
             url,
-            requestHeaders);
+            requestHeaders, abfsConfiguration);

Review Comment:
   put on a new line for consistency



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnFactory.java:
##########
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.conn.ManagedHttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+
+/**
+ * Custom implementation of {@link ManagedHttpClientConnectionFactory} and 
overrides
+ * {@link ManagedHttpClientConnectionFactory#create(HttpRoute, 
ConnectionConfig)} to return
+ * {@link AbfsManagedApacheHttpConnection}.
+ */
+public class AbfsConnFactory extends ManagedHttpClientConnectionFactory {

Review Comment:
   give it a better name, e.g AbfsHttpClientConnectionFactory



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(
+        socketFactoryRegistry, null, null);
+  }
+
+  @Override
+  public ConnectionRequest requestConnection(final HttpRoute route,
+      final Object state) {
+    return new ConnectionRequest() {
+      @Override
+      public HttpClientConnection get(final long timeout,
+          final TimeUnit timeUnit)
+          throws InterruptedException, ExecutionException,
+          ConnectionPoolTimeoutException {
+        try {
+          HttpClientConnection clientConn = kac.get(route);
+          if (clientConn != null) {
+            return clientConn;
+          }
+          return httpConnectionFactory.create(route, null);
+        } catch (IOException ex) {
+          throw new ExecutionException(ex);
+        }
+      }
+
+      @Override
+      public boolean cancel() {
+        return false;
+      }
+    };
+  }
+
+  /**
+   * Releases a connection for reuse. It can be reused only if validDuration 
is greater than 0.
+   * This method is called by {@link org.apache.http.impl.execchain} internal 
class `ConnectionHolder`.
+   * If it wants to reuse the connection, it will send a non-zero 
validDuration, else it will send 0.
+   * @param conn the connection to release
+   * @param newState the new state of the connection
+   * @param validDuration the duration for which the connection is valid
+   * @param timeUnit the time unit for the validDuration
+   */
+  @Override
+  public void releaseConnection(final HttpClientConnection conn,
+      final Object newState,
+      final long validDuration,
+      final TimeUnit timeUnit) {
+    if (validDuration == 0) {
+      return;
+    }
+    if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
+      HttpRoute route = ((AbfsManagedApacheHttpConnection) 
conn).getHttpRoute();

Review Comment:
   log at debug that a connection was released



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {

Review Comment:
   add a log and log every callback so that when debugging we stand a chance of 
understanding what is happening



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.

Review Comment:
   this link won't resolve in javadocs, best to say httpclient {@code 
HttpClientConnectionManager}.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java:
##########
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating calls using 
JDK's HttpURLConnection.
+ */
+public class AbfsJdkHttpOperation extends AbfsHttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsJdkHttpOperation.class);
+
+  private HttpURLConnection connection;

Review Comment:
   final?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -254,158 +311,19 @@ public String getMaskedEncodedUrl() {
     return maskedEncodedUrl;
   }
 
-  /**
-   * Initializes a new HTTP request and opens the connection.
-   *
-   * @param url The full URL including query string parameters.
-   * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
-   * @param requestHeaders The HTTP request headers.READ_TIMEOUT
-   * @param connectionTimeout The Connection Timeout value to be used while 
establishing http connection
-   * @param readTimeout The Read Timeout value to be used with http connection 
while making a request
-   * @throws IOException if an error occurs.
-   */
-  public AbfsHttpOperation(final URL url, final String method, final 
List<AbfsHttpHeader> requestHeaders,

Review Comment:
   1. I'd prefer to split onto separate lines
   2. I'd prefer timeouts to be Duration. If not, at least include unit in 
their name



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -20,57 +20,51 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.HttpURLConnection;
-import java.net.ProtocolException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
-
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
-import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
-
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 
 /**
- * Represents an HTTP operation.
+ * Base Http operation class for orchestrating server IO calls. Child classes 
would
+ * define the certain orchestration implementation on the basis of network 
library used.
+ * <p>
+ * For JDK netlib usage, the child class would be {@link 
AbfsJdkHttpOperation}. <br>
+ * For ApacheHttpClient netlib usage, the child class would be {@link 
AbfsAHCHttpOperation}.
+ * </p>
  */
-public class AbfsHttpOperation implements AbfsPerfLoggable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbfsHttpOperation.class);
+public abstract class AbfsHttpOperation implements AbfsPerfLoggable {

Review Comment:
   add javadocs for new fields, methods.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -103,13 +111,31 @@ public static AbfsHttpOperation 
getAbfsHttpOperationWithFixedResult(
   protected AbfsHttpOperation(final URL url,
       final String method,
       final int httpStatus) {
+    this.log = null;

Review Comment:
   should be set to something



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(

Review Comment:
   use a `this.` prefix for consistency



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java:
##########
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import org.apache.hadoop.fs.azurebfs.constants.AbfsRestOperationType;
+
 final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {

Review Comment:
   i know this code exists already, but lets document it



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(
+        socketFactoryRegistry, null, null);
+  }
+
+  @Override
+  public ConnectionRequest requestConnection(final HttpRoute route,
+      final Object state) {
+    return new ConnectionRequest() {
+      @Override
+      public HttpClientConnection get(final long timeout,
+          final TimeUnit timeUnit)
+          throws InterruptedException, ExecutionException,
+          ConnectionPoolTimeoutException {
+        try {
+          HttpClientConnection clientConn = kac.get(route);
+          if (clientConn != null) {
+            return clientConn;
+          }
+          return httpConnectionFactory.create(route, null);
+        } catch (IOException ex) {
+          throw new ExecutionException(ex);
+        }
+      }
+
+      @Override
+      public boolean cancel() {
+        return false;
+      }
+    };
+  }
+
+  /**
+   * Releases a connection for reuse. It can be reused only if validDuration 
is greater than 0.
+   * This method is called by {@link org.apache.http.impl.execchain} internal 
class `ConnectionHolder`.
+   * If it wants to reuse the connection, it will send a non-zero 
validDuration, else it will send 0.
+   * @param conn the connection to release
+   * @param newState the new state of the connection
+   * @param validDuration the duration for which the connection is valid
+   * @param timeUnit the time unit for the validDuration
+   */
+  @Override
+  public void releaseConnection(final HttpClientConnection conn,
+      final Object newState,
+      final long validDuration,
+      final TimeUnit timeUnit) {
+    if (validDuration == 0) {
+      return;
+    }
+    if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
+      HttpRoute route = ((AbfsManagedApacheHttpConnection) 
conn).getHttpRoute();
+      if (route != null) {
+        kac.put(route, conn);
+      }
+    }
+  }
+
+  @Override
+  public void connect(final HttpClientConnection conn,
+      final HttpRoute route,
+      final int connectTimeout,
+      final HttpContext context) throws IOException {
+    long start = System.currentTimeMillis();
+    connectionOperator.connect((AbfsManagedApacheHttpConnection) conn,
+        route.getTargetHost(), route.getLocalSocketAddress(),
+        connectTimeout, SocketConfig.DEFAULT, context);
+    if (context instanceof AbfsManagedHttpClientContext) {
+      ((AbfsManagedHttpClientContext) context).setConnectTime(
+          System.currentTimeMillis() - start);
+    }
+  }
+
+  @Override
+  public void upgrade(final HttpClientConnection conn,
+      final HttpRoute route,
+      final HttpContext context) throws IOException {
+    connectionOperator.upgrade((AbfsManagedApacheHttpConnection) conn,
+        route.getTargetHost(), context);
+  }
+
+  @Override
+  public void routeComplete(final HttpClientConnection conn,
+      final HttpRoute route,
+      final HttpContext context) throws IOException {
+
+  }
+
+  @Override
+  public void closeIdleConnections(final long idletime,
+      final TimeUnit timeUnit) {
+
+  }
+
+  @Override
+  public void closeExpiredConnections() {

Review Comment:
   why not do this? are you sure the java KeepAliveCache does the right thing?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable 
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit 
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults 
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to 
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can 
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager. 
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
+    extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
+    implements Runnable {
+
+  private int maxConn;
+
+  private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+
+  private Thread keepAliveTimer = null;
+
+  private boolean isPaused = false;
+
+  private KeepAliveCache() {
+    setMaxConn();
+  }
+
+  synchronized void pauseThread() {
+    isPaused = true;
+  }
+
+  synchronized void resumeThread() {
+    isPaused = false;
+    notify();
+  }
+
+  private void setMaxConn() {
+    String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+    if (sysPropMaxConn == null) {
+      maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+    } else {
+      maxConn = Integer.parseInt(sysPropMaxConn);
+    }
+  }
+
+  public void setAbfsConfig(AbfsConfiguration abfsConfiguration) {
+    this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+    this.connectionIdleTTL = 
abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+  }
+
+  public long getConnectionIdleTTL() {
+    return connectionIdleTTL;
+  }
+
+  private static final KeepAliveCache INSTANCE = new KeepAliveCache();
+
+  public static KeepAliveCache getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  void clearThread() {
+    clear();
+    setMaxConn();
+  }
+
+  private int getKacSize() {

Review Comment:
   give a better name



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`

Review Comment:
   this is not copied from the JDK is it?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable 
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit 
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults 
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to 
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can 
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager. 
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache

Review Comment:
   add javadocs all the way down





> [ABFS]: ApacheHttpClient adaptation as network library
> ------------------------------------------------------
>
>                 Key: HADOOP-19120
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19120
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0
>            Reporter: Pranav Saxena
>            Assignee: Pranav Saxena
>            Priority: Major
>              Labels: pull-request-available
>
> Apache HttpClient is more feature-rich and flexible and gives application 
> more granular control over networking parameter.
> ABFS currently relies on the JDK-net library. This library is managed by 
> OpenJDK and has no performance problem. However, it limits the application's 
> control over networking, and there are very few APIs and hooks exposed that 
> the application can use to get metrics, choose which and when a connection 
> should be reused. ApacheHttpClient will give important hooks to fetch 
> important metrics and control networking parameters.
> A custom implementation of connection-pool is used. The implementation is 
> adapted from the JDK8 connection pooling. Reasons for doing it:
> 1. PoolingHttpClientConnectionManager heuristic caches all the reusable 
> connections it has created. JDK's implementation only caches limited number 
> of connections. The limit is given by JVM system property 
> "http.maxConnections". If there is no system-property, it defaults to 5. 
> Connection-establishment latency increased with all the connections were 
> cached. Hence, adapting the pooling heuristic of JDK netlib,
> 2. In PoolingHttpClientConnectionManager, it expects the application to 
> provide `setMaxPerRoute` and `setMaxTotal`, which the implementation uses as 
> the total number of connections it can create. For application using ABFS, it 
> is not feasible to provide a value in the initialisation of the 
> connectionManager. JDK's implementation has no cap on the number of 
> connections it can have opened on a moment. Hence, adapting the pooling 
> heuristic of JDK netlib,



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to