[
https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851138#comment-17851138
]
ASF GitHub Bot commented on HADOOP-19120:
-----------------------------------------
steveloughran commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1621368582
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -167,5 +167,14 @@ public final class FileSystemConfigurations {
public static final int HUNDRED = 100;
public static final long THOUSAND = 1000L;
+ public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
+ = HttpOperationType.APACHE_HTTP_CLIENT;
+
+ public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES
= 3;
+
+ public static final long DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME = 5_000L;
+
+ public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_CONNECTIONS = 5;
Review Comment:
that's quite a low number, isn't it?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+
+public class AbfsApacheHttpExpect100Exception extends IOException {
+ private final HttpResponse httpResponse;
+
+ public AbfsApacheHttpExpect100Exception(final String s, final HttpResponse
httpResponse) {
+ super(s);
+ this.httpResponse = httpResponse;
Review Comment:
pull this into the proposed superclass, add a requireNonNull()
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+ private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+ private final AbfsConnFactory httpConnectionFactory;
+
+ private final HttpClientConnectionOperator connectionOperator;
+
+ AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
+ AbfsConnFactory connectionFactory) {
+ this.httpConnectionFactory = connectionFactory;
+ connectionOperator = new DefaultHttpClientConnectionOperator(
+ socketFactoryRegistry, null, null);
+ }
+
+ @Override
+ public ConnectionRequest requestConnection(final HttpRoute route,
+ final Object state) {
+ return new ConnectionRequest() {
Review Comment:
log at debug that a connection was requested
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -316,5 +316,10 @@ public static String accountProperty(String property,
String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE =
"fs.azure.buffered.pread.disable";
+ /**Defines what network library to use for server IO calls {@value }*/
+ public static final String FS_AZURE_NETWORKING_LIBRARY =
"fs.azure.networking.library";
+ public static final String
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES =
"fs.azure.apache.http.client.max.io.exception.retries";
Review Comment:
these all need javadocs
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -95,11 +98,15 @@ public class AbfsRestOperation {
private String failureReason;
private AbfsRetryPolicy retryPolicy;
+ private final AbfsConfiguration abfsConfiguration;
+
/**
* This variable stores the tracing context used for last Rest Operation.
*/
private TracingContext lastUsedTracingContext;
+ private int apacheHttpClientIoExceptions = 0;
Review Comment:
javadocs
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsManagedHttpClientContext.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.client.protocol.HttpClientContext;
+
+public class AbfsManagedHttpClientContext extends HttpClientContext {
Review Comment:
javadocs
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
+ extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
+ implements Runnable {
+
+ private int maxConn;
+
+ private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+
+ private Thread keepAliveTimer = null;
+
+ private boolean isPaused = false;
+
+ private KeepAliveCache() {
+ setMaxConn();
+ }
+
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ synchronized void resumeThread() {
+ isPaused = false;
+ notify();
+ }
+
+ private void setMaxConn() {
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+ }
+
+ public void setAbfsConfig(AbfsConfiguration abfsConfiguration) {
+ this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ this.connectionIdleTTL =
abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ }
+
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ private static final KeepAliveCache INSTANCE = new KeepAliveCache();
+
+ public static KeepAliveCache getInstance() {
+ return INSTANCE;
+ }
+
+ @VisibleForTesting
+ void clearThread() {
+ clear();
+ setMaxConn();
+ }
+
+ private int getKacSize() {
+ return INSTANCE.maxConn;
+ }
+
+ @Override
+ public void run() {
+ do {
+ synchronized (this) {
+ while (isPaused) {
+ try {
+ wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ kacCleanup();
+ } while (size() > 0);
+ }
+
+ private void kacCleanup() {
+ try {
+ Thread.sleep(connectionIdleTTL);
+ } catch (InterruptedException ex) {
+ return;
+ }
+ synchronized (this) {
+ long currentTime = System.currentTimeMillis();
+
+ ArrayList<KeepAliveKey> keysToRemove
+ = new ArrayList<KeepAliveKey>();
+
+ for (Map.Entry<KeepAliveKey, ClientVector> entry : entrySet()) {
+ KeepAliveKey key = entry.getKey();
+ ClientVector v = entry.getValue();
+ synchronized (v) {
+ int i;
+
+ for (i = 0; i < v.size(); i++) {
+ KeepAliveEntry e = v.elementAt(i);
+ if ((currentTime - e.idleStartTime) > v.nap
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ v.subList(0, i).clear();
+
+ if (v.size() == 0) {
+ keysToRemove.add(key);
+ }
+ }
+ }
+
+ for (KeepAliveKey key : keysToRemove) {
+ removeVector(key);
+ }
+ }
+ }
+
+ synchronized void removeVector(KeepAliveKey k) {
+ super.remove(k);
+ }
+
+ public synchronized void put(final HttpRoute httpRoute,
+ final HttpClientConnection httpClientConnection) {
+ boolean startThread = (keepAliveTimer == null);
+ if (!startThread) {
+ if (!keepAliveTimer.isAlive()) {
+ startThread = true;
+ }
+ }
+ if (startThread) {
+ clear();
+ final KeepAliveCache cache = this;
+ ThreadGroup grp = Thread.currentThread().getThreadGroup();
+ ThreadGroup parent = null;
+ while ((parent = grp.getParent()) != null) {
+ grp = parent;
+ }
+
+ keepAliveTimer = new Thread(grp, cache, "Keep-Alive-Timer");
Review Comment:
OK, this is line-for-line the same as the JDK. It doesn't matter what
license microsoft has with oracle, nobody is allowed to paste bits from the JDK
in.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsAHCHttpOperation.class);
+
+ private HttpRequestBase httpRequestBase;
+
+ private HttpResponse httpResponse;
+
+ private boolean connectionDisconnectedOnError = false;
+
+ private final boolean isPayloadRequest;
+
+ public AbfsAHCHttpOperation(final URL url,
+ final String method,
+ final List<AbfsHttpHeader> requestHeaders,
+ final int connectionTimeout,
+ final int readTimeout) {
+ super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+ this.isPayloadRequest = isPayloadRequest(method);
+ }
+
+ @VisibleForTesting
+ AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+ return new AbfsManagedHttpClientContext();
+ }
+
+ private boolean isPayloadRequest(final String method) {
+ return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+ || HTTP_METHOD_POST.equals(method);
+ }
+
+ @Override
+ protected InputStream getErrorStream() throws IOException {
+ HttpEntity entity = httpResponse.getEntity();
+ if (entity == null) {
+ return null;
+ }
+ return entity.getContent();
+ }
+
+ @Override
+ String getConnProperty(final String key) {
+ for (AbfsHttpHeader header : getRequestHeaders()) {
+ if (header.getName().equals(key)) {
+ return header.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ URL getConnUrl() {
+ return getUrl();
+ }
+
+ @Override
+ String getConnRequestMethod() {
+ return getMethod();
+ }
+
+ @Override
+ Integer getConnResponseCode() throws IOException {
+ return getStatusCode();
+ }
+
+ @Override
+ String getConnResponseMessage() throws IOException {
+ return getStatusDescription();
+ }
+
+ public void processResponse(final byte[] buffer,
+ final int offset,
+ final int length) throws IOException {
+ try {
+ if (!isPayloadRequest) {
+ prepareRequest();
+ httpResponse = executeRequest();
+ }
+ parseResponseHeaderAndBody(buffer, offset, length);
+ } finally {
+ if (httpResponse != null) {
+ try {
+ EntityUtils.consume(httpResponse.getEntity());
+ } finally {
+ if (httpResponse instanceof CloseableHttpResponse) {
+ ((CloseableHttpResponse) httpResponse).close();
+ }
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void parseResponseHeaderAndBody(final byte[] buffer,
+ final int offset,
+ final int length) throws IOException {
+ setStatusCode(parseStatusCode(httpResponse));
+
+ setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+ String requestId = getResponseHeader(
+ HttpHeaderConfigurations.X_MS_REQUEST_ID);
+ if (requestId == null) {
+ requestId = AbfsHttpConstants.EMPTY_STRING;
+ }
+ setRequestId(requestId);
+
+ // dump the headers
+ AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
Review Comment:
only do this if the log is at debug, otherwise getResponseHeaders is doing
needless work
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
+ public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";
+ public static final Integer DEFAULT_MAX_CONN_SYS_PROP = 5;
Review Comment:
i know it's the jdk default, but it's pretty awful. Why not increasse?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls
using
Review Comment:
needs javadocs for each non-inherited method and field.
just assume that I *always* expect these and save review time by adding them
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsAHCHttpOperation.class);
+
+ private HttpRequestBase httpRequestBase;
Review Comment:
add a little javadoc for these fields
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
+ public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";
+ public static final Integer DEFAULT_MAX_CONN_SYS_PROP = 5;
+ public static final int KAC_DEFAULT_CONN_TTL = 5_000;
Review Comment:
going to prefer using a Duration.ofSeconds(5) and extracting the millis
value later...removes ambiguity of unit
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsAHCHttpOperation.class);
+
+ private HttpRequestBase httpRequestBase;
+
+ private HttpResponse httpResponse;
+
+ private boolean connectionDisconnectedOnError = false;
+
+ private final boolean isPayloadRequest;
+
+ public AbfsAHCHttpOperation(final URL url,
+ final String method,
+ final List<AbfsHttpHeader> requestHeaders,
+ final int connectionTimeout,
+ final int readTimeout) {
+ super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+ this.isPayloadRequest = isPayloadRequest(method);
+ }
+
+ @VisibleForTesting
+ AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+ return new AbfsManagedHttpClientContext();
+ }
+
+ private boolean isPayloadRequest(final String method) {
+ return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+ || HTTP_METHOD_POST.equals(method);
+ }
+
+ @Override
+ protected InputStream getErrorStream() throws IOException {
+ HttpEntity entity = httpResponse.getEntity();
+ if (entity == null) {
+ return null;
+ }
+ return entity.getContent();
+ }
+
+ @Override
+ String getConnProperty(final String key) {
+ for (AbfsHttpHeader header : getRequestHeaders()) {
+ if (header.getName().equals(key)) {
+ return header.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ URL getConnUrl() {
+ return getUrl();
+ }
+
+ @Override
+ String getConnRequestMethod() {
+ return getMethod();
+ }
+
+ @Override
+ Integer getConnResponseCode() throws IOException {
+ return getStatusCode();
+ }
+
+ @Override
+ String getConnResponseMessage() throws IOException {
+ return getStatusDescription();
+ }
+
+ public void processResponse(final byte[] buffer,
+ final int offset,
+ final int length) throws IOException {
+ try {
+ if (!isPayloadRequest) {
+ prepareRequest();
+ httpResponse = executeRequest();
+ }
+ parseResponseHeaderAndBody(buffer, offset, length);
+ } finally {
+ if (httpResponse != null) {
+ try {
+ EntityUtils.consume(httpResponse.getEntity());
+ } finally {
+ if (httpResponse instanceof CloseableHttpResponse) {
+ ((CloseableHttpResponse) httpResponse).close();
+ }
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void parseResponseHeaderAndBody(final byte[] buffer,
+ final int offset,
+ final int length) throws IOException {
+ setStatusCode(parseStatusCode(httpResponse));
+
+ setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+ String requestId = getResponseHeader(
+ HttpHeaderConfigurations.X_MS_REQUEST_ID);
+ if (requestId == null) {
+ requestId = AbfsHttpConstants.EMPTY_STRING;
+ }
+ setRequestId(requestId);
+
+ // dump the headers
+ AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+ getResponseHeaders(httpResponse));
+ parseResponse(buffer, offset, length);
+ }
+
+ @VisibleForTesting
+ int parseStatusCode(HttpResponse httpResponse) {
+ return httpResponse.getStatusLine().getStatusCode();
+ }
+
+ @VisibleForTesting
+ HttpResponse executeRequest() throws IOException {
+ AbfsManagedHttpClientContext abfsHttpClientContext
+ = setFinalAbfsClientContext();
+ HttpResponse response
+ = AbfsApacheHttpClient.getClient().execute(httpRequestBase,
+ abfsHttpClientContext, getConnectionTimeout(), getReadTimeout());
+ setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
+ setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
+ setRecvResponseTimeMs(abfsHttpClientContext.getReadTime());
+ return response;
+ }
+
+ private Map<String, List<String>> getResponseHeaders(final HttpResponse
httpResponse) {
+ if (httpResponse == null || httpResponse.getAllHeaders() == null) {
+ return new HashMap<>();
+ }
+ Map<String, List<String>> map = new HashMap<>();
+ for (Header header : httpResponse.getAllHeaders()) {
+ map.put(header.getName(), new ArrayList<String>(
+ Collections.singleton(header.getValue())));
+ }
+ return map;
+ }
+
+ @Override
+ public void setRequestProperty(final String key, final String value) {
+ setHeader(key, value);
+ }
+
+ @Override
+ Map<String, List<String>> getRequestProperties() {
+ Map<String, List<String>> map = new HashMap<>();
+ for (AbfsHttpHeader header : getRequestHeaders()) {
+ map.put(header.getName(),
+ new ArrayList<String>() {{
+ add(header.getValue());
+ }});
+ }
+ return map;
+ }
+
+ @Override
+ public String getResponseHeader(final String headerName) {
+ if (httpResponse == null) {
+ return null;
+ }
+ Header header = httpResponse.getFirstHeader(headerName);
+ if (header != null) {
+ return header.getValue();
+ }
+ return null;
+ }
+
+ @Override
+ InputStream getContentInputStream()
+ throws IOException {
+ if (httpResponse == null || httpResponse.getEntity() == null) {
+ return null;
+ }
+ return httpResponse.getEntity().getContent();
+ }
+
+ public void sendPayload(final byte[] buffer,
+ final int offset,
+ final int length)
+ throws IOException {
+ if (!isPayloadRequest) {
+ return;
+ }
+
+ switch (getMethod()) {
+ case HTTP_METHOD_PUT:
+ httpRequestBase = new HttpPut(getUri());
+ break;
+ case HTTP_METHOD_PATCH:
+ httpRequestBase = new HttpPatch(getUri());
+ break;
+ case HTTP_METHOD_POST:
+ httpRequestBase = new HttpPost(getUri());
+ break;
+ default:
+ /*
+ * This should never happen as the method is already validated in
+ * isPayloadRequest.
+ */
+ return;
+ }
+
+ setExpectedBytesToBeSent(length);
+ if (buffer != null) {
+ HttpEntity httpEntity = new ByteArrayEntity(buffer, offset, length,
+ TEXT_PLAIN);
+ ((HttpEntityEnclosingRequestBase) httpRequestBase).setEntity(
+ httpEntity);
+ }
+
+ translateHeaders(httpRequestBase, getRequestHeaders());
+ try {
+ httpResponse = executeRequest();
+ } catch (AbfsApacheHttpExpect100Exception ex) {
+ LOG.debug(
+ "Getting output stream failed with expect header enabled, returning
back."
+ + "Expect 100 assertion failed for uri {} with status code: {}",
+ getMaskedUrl(), parseStatusCode(ex.getHttpResponse()),
+ ex);
+ connectionDisconnectedOnError = true;
+ httpResponse = ex.getHttpResponse();
+ } finally {
+ if (!connectionDisconnectedOnError
+ && httpRequestBase instanceof HttpEntityEnclosingRequestBase) {
+ setBytesSent(length);
+ }
+ }
+ }
+
+ private void prepareRequest() throws IOException {
+ switch (getMethod()) {
+ case HTTP_METHOD_GET:
+ httpRequestBase = new HttpGet(getUri());
+ break;
+ case HTTP_METHOD_DELETE:
+ httpRequestBase = new HttpDelete(getUri());
+ break;
+ case HTTP_METHOD_HEAD:
+ httpRequestBase = new HttpHead(getUri());
+ break;
+ default:
+ /*
+ * This would not happen as the AbfsClient would always be sending valid
+ * method.
+ */
+ throw new PathIOException(getUrl().toString(),
+ "Unsupported HTTP method: " + getMethod());
+ }
+ translateHeaders(httpRequestBase, getRequestHeaders());
+ }
+
+ private URI getUri() throws IOException {
+ try {
+ return getUrl().toURI();
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void translateHeaders(final HttpRequestBase httpRequestBase,
+ final List<AbfsHttpHeader> requestHeaders) {
+ for (AbfsHttpHeader header : requestHeaders) {
+ httpRequestBase.setHeader(header.getName(), header.getValue());
+ }
+ }
+
+ private void setHeader(String name, String val) {
+ addHeaderToRequestHeaderList(new AbfsHttpHeader(name, val));
+ }
+
+ @Override
+ public String getRequestProperty(String name) {
+ for (AbfsHttpHeader header : getRequestHeaders()) {
+ if (header.getName().equals(name)) {
+ return header.getValue();
+ }
+ }
+ return EMPTY_STRING;
+ }
+
+ @Override
+ boolean getConnectionDisconnectedOnError() {
+ return connectionDisconnectedOnError;
+ }
+
+ @Override
+ public String getTracingContextSuffix() {
+ return APACHE_IMPL;
+ }
+
+ public String getClientRequestId() {
+ for (AbfsHttpHeader header : getRequestHeaders()) {
Review Comment:
replace with `getRequestProperty(X_MS_CLIENT_REQUEST_ID)`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsApacheHttpExpect100Exception.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import java.io.IOException;
+
+import org.apache.http.HttpResponse;
+
+public class AbfsApacheHttpExpect100Exception extends IOException {
Review Comment:
How about a generic HttpResponseException which this then subclasses?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTP_SCHEME;
+import static
org.apache.http.conn.ssl.SSLConnectionSocketFactory.getDefaultHostnameVerifier;
+
+final class AbfsApacheHttpClient {
Review Comment:
guess what I'm going to stay about javadocs here?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,12 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
+ public static final String HTTP_MAX_CONN_SYS_PROP = "http.maxConnections";
Review Comment:
add javadoc to explain why this value is a sysprop (I know, but others may
not)
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsAHCHttpOperation.class);
+
+ private HttpRequestBase httpRequestBase;
+
+ private HttpResponse httpResponse;
+
+ private boolean connectionDisconnectedOnError = false;
+
+ private final boolean isPayloadRequest;
+
+ public AbfsAHCHttpOperation(final URL url,
+ final String method,
+ final List<AbfsHttpHeader> requestHeaders,
+ final int connectionTimeout,
+ final int readTimeout) {
+ super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+ this.isPayloadRequest = isPayloadRequest(method);
+ }
+
+ @VisibleForTesting
+ AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+ return new AbfsManagedHttpClientContext();
+ }
+
+ private boolean isPayloadRequest(final String method) {
+ return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+ || HTTP_METHOD_POST.equals(method);
+ }
+
+ @Override
+ protected InputStream getErrorStream() throws IOException {
+ HttpEntity entity = httpResponse.getEntity();
+ if (entity == null) {
+ return null;
+ }
+ return entity.getContent();
+ }
+
+ @Override
+ String getConnProperty(final String key) {
+ for (AbfsHttpHeader header : getRequestHeaders()) {
+ if (header.getName().equals(key)) {
+ return header.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ URL getConnUrl() {
+ return getUrl();
+ }
+
+ @Override
+ String getConnRequestMethod() {
+ return getMethod();
+ }
+
+ @Override
+ Integer getConnResponseCode() throws IOException {
+ return getStatusCode();
+ }
+
+ @Override
+ String getConnResponseMessage() throws IOException {
+ return getStatusDescription();
+ }
+
+ public void processResponse(final byte[] buffer,
+ final int offset,
+ final int length) throws IOException {
+ try {
+ if (!isPayloadRequest) {
+ prepareRequest();
+ httpResponse = executeRequest();
+ }
+ parseResponseHeaderAndBody(buffer, offset, length);
+ } finally {
+ if (httpResponse != null) {
+ try {
+ EntityUtils.consume(httpResponse.getEntity());
+ } finally {
+ if (httpResponse instanceof CloseableHttpResponse) {
+ ((CloseableHttpResponse) httpResponse).close();
+ }
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void parseResponseHeaderAndBody(final byte[] buffer,
+ final int offset,
+ final int length) throws IOException {
+ setStatusCode(parseStatusCode(httpResponse));
+
+ setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+ String requestId = getResponseHeader(
+ HttpHeaderConfigurations.X_MS_REQUEST_ID);
+ if (requestId == null) {
+ requestId = AbfsHttpConstants.EMPTY_STRING;
+ }
+ setRequestId(requestId);
+
+ // dump the headers
+ AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+ getResponseHeaders(httpResponse));
+ parseResponse(buffer, offset, length);
+ }
+
+ @VisibleForTesting
+ int parseStatusCode(HttpResponse httpResponse) {
+ return httpResponse.getStatusLine().getStatusCode();
+ }
+
+ @VisibleForTesting
+ HttpResponse executeRequest() throws IOException {
+ AbfsManagedHttpClientContext abfsHttpClientContext
+ = setFinalAbfsClientContext();
+ HttpResponse response
Review Comment:
recommend logging at debug that this is about to be called
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsAHCHttpOperation.class);
+
+ private HttpRequestBase httpRequestBase;
+
+ private HttpResponse httpResponse;
+
+ private boolean connectionDisconnectedOnError = false;
+
+ private final boolean isPayloadRequest;
+
+ public AbfsAHCHttpOperation(final URL url,
+ final String method,
+ final List<AbfsHttpHeader> requestHeaders,
+ final int connectionTimeout,
+ final int readTimeout) {
+ super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+ this.isPayloadRequest = isPayloadRequest(method);
Review Comment:
log at debug that this will be used, helps to track down what is going on
when there are network issues
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating server calls
using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends AbfsHttpOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsAHCHttpOperation.class);
+
+ private HttpRequestBase httpRequestBase;
+
+ private HttpResponse httpResponse;
+
+ private boolean connectionDisconnectedOnError = false;
+
+ private final boolean isPayloadRequest;
+
+ public AbfsAHCHttpOperation(final URL url,
+ final String method,
+ final List<AbfsHttpHeader> requestHeaders,
+ final int connectionTimeout,
+ final int readTimeout) {
+ super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout);
+ this.isPayloadRequest = isPayloadRequest(method);
+ }
+
+ @VisibleForTesting
+ AbfsManagedHttpClientContext setFinalAbfsClientContext() {
+ return new AbfsManagedHttpClientContext();
+ }
+
+ private boolean isPayloadRequest(final String method) {
+ return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+ || HTTP_METHOD_POST.equals(method);
+ }
+
+ @Override
+ protected InputStream getErrorStream() throws IOException {
+ HttpEntity entity = httpResponse.getEntity();
+ if (entity == null) {
+ return null;
+ }
+ return entity.getContent();
+ }
+
+ @Override
+ String getConnProperty(final String key) {
+ for (AbfsHttpHeader header : getRequestHeaders()) {
+ if (header.getName().equals(key)) {
+ return header.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ URL getConnUrl() {
+ return getUrl();
+ }
+
+ @Override
+ String getConnRequestMethod() {
+ return getMethod();
+ }
+
+ @Override
+ Integer getConnResponseCode() throws IOException {
+ return getStatusCode();
+ }
+
+ @Override
+ String getConnResponseMessage() throws IOException {
+ return getStatusDescription();
+ }
+
+ public void processResponse(final byte[] buffer,
Review Comment:
add @Override
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1217,7 +1226,7 @@ public AbfsRestOperation deletePath(final String path,
final boolean recursive,
this,
HTTP_METHOD_DELETE,
url,
- requestHeaders);
+ requestHeaders, abfsConfiguration);
Review Comment:
put on a new line for consistency
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnFactory.java:
##########
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.conn.ManagedHttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+
+/**
+ * Custom implementation of {@link ManagedHttpClientConnectionFactory} and
overrides
+ * {@link ManagedHttpClientConnectionFactory#create(HttpRoute,
ConnectionConfig)} to return
+ * {@link AbfsManagedApacheHttpConnection}.
+ */
+public class AbfsConnFactory extends ManagedHttpClientConnectionFactory {
Review Comment:
give it a better name, e.g AbfsHttpClientConnectionFactory
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+ private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+ private final AbfsConnFactory httpConnectionFactory;
+
+ private final HttpClientConnectionOperator connectionOperator;
+
+ AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
+ AbfsConnFactory connectionFactory) {
+ this.httpConnectionFactory = connectionFactory;
+ connectionOperator = new DefaultHttpClientConnectionOperator(
+ socketFactoryRegistry, null, null);
+ }
+
+ @Override
+ public ConnectionRequest requestConnection(final HttpRoute route,
+ final Object state) {
+ return new ConnectionRequest() {
+ @Override
+ public HttpClientConnection get(final long timeout,
+ final TimeUnit timeUnit)
+ throws InterruptedException, ExecutionException,
+ ConnectionPoolTimeoutException {
+ try {
+ HttpClientConnection clientConn = kac.get(route);
+ if (clientConn != null) {
+ return clientConn;
+ }
+ return httpConnectionFactory.create(route, null);
+ } catch (IOException ex) {
+ throw new ExecutionException(ex);
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+ };
+ }
+
+ /**
+ * Releases a connection for reuse. It can be reused only if validDuration
is greater than 0.
+ * This method is called by {@link org.apache.http.impl.execchain} internal
class `ConnectionHolder`.
+ * If it wants to reuse the connection, it will send a non-zero
validDuration, else it will send 0.
+ * @param conn the connection to release
+ * @param newState the new state of the connection
+ * @param validDuration the duration for which the connection is valid
+ * @param timeUnit the time unit for the validDuration
+ */
+ @Override
+ public void releaseConnection(final HttpClientConnection conn,
+ final Object newState,
+ final long validDuration,
+ final TimeUnit timeUnit) {
+ if (validDuration == 0) {
+ return;
+ }
+ if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
+ HttpRoute route = ((AbfsManagedApacheHttpConnection)
conn).getHttpRoute();
Review Comment:
log at debug that a connection was released
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
Review Comment:
add a log and log every callback so that when debugging we stand a chance of
understanding what is happening
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link
HttpClientConnectionManager}.
Review Comment:
this link won't resolve in javadocs, best to say httpclient {@code
HttpClientConnectionManager}.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsJdkHttpOperation.java:
##########
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_IMPL;
+import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+
+/**
+ * Implementation of {@link AbfsHttpOperation} for orchestrating calls using
JDK's HttpURLConnection.
+ */
+public class AbfsJdkHttpOperation extends AbfsHttpOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbfsJdkHttpOperation.class);
+
+ private HttpURLConnection connection;
Review Comment:
final?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -254,158 +311,19 @@ public String getMaskedEncodedUrl() {
return maskedEncodedUrl;
}
- /**
- * Initializes a new HTTP request and opens the connection.
- *
- * @param url The full URL including query string parameters.
- * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
- * @param requestHeaders The HTTP request headers.READ_TIMEOUT
- * @param connectionTimeout The Connection Timeout value to be used while
establishing http connection
- * @param readTimeout The Read Timeout value to be used with http connection
while making a request
- * @throws IOException if an error occurs.
- */
- public AbfsHttpOperation(final URL url, final String method, final
List<AbfsHttpHeader> requestHeaders,
Review Comment:
1. I'd prefer to split onto separate lines
2. I'd prefer timeouts to be Duration. If not, at least include unit in
their name
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -20,57 +20,51 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.HttpURLConnection;
-import java.net.ProtocolException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
-
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
-import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import java.util.Map;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
-
-import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
-import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
-import static
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
/**
- * Represents an HTTP operation.
+ * Base Http operation class for orchestrating server IO calls. Child classes
would
+ * define the certain orchestration implementation on the basis of network
library used.
+ * <p>
+ * For JDK netlib usage, the child class would be {@link
AbfsJdkHttpOperation}. <br>
+ * For ApacheHttpClient netlib usage, the child class would be {@link
AbfsAHCHttpOperation}.
+ * </p>
*/
-public class AbfsHttpOperation implements AbfsPerfLoggable {
- private static final Logger LOG =
LoggerFactory.getLogger(AbfsHttpOperation.class);
+public abstract class AbfsHttpOperation implements AbfsPerfLoggable {
Review Comment:
add javadocs for new fields, methods.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -103,13 +111,31 @@ public static AbfsHttpOperation
getAbfsHttpOperationWithFixedResult(
protected AbfsHttpOperation(final URL url,
final String method,
final int httpStatus) {
+ this.log = null;
Review Comment:
should be set to something
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+ private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+ private final AbfsConnFactory httpConnectionFactory;
+
+ private final HttpClientConnectionOperator connectionOperator;
+
+ AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
+ AbfsConnFactory connectionFactory) {
+ this.httpConnectionFactory = connectionFactory;
+ connectionOperator = new DefaultHttpClientConnectionOperator(
Review Comment:
use a `this.` prefix for consistency
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java:
##########
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.azurebfs.services;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsRestOperationType;
+
final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {
Review Comment:
i know this code exists already, but lets document it
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+class AbfsConnectionManager implements HttpClientConnectionManager {
+
+ private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+ private final AbfsConnFactory httpConnectionFactory;
+
+ private final HttpClientConnectionOperator connectionOperator;
+
+ AbfsConnectionManager(Registry<ConnectionSocketFactory>
socketFactoryRegistry,
+ AbfsConnFactory connectionFactory) {
+ this.httpConnectionFactory = connectionFactory;
+ connectionOperator = new DefaultHttpClientConnectionOperator(
+ socketFactoryRegistry, null, null);
+ }
+
+ @Override
+ public ConnectionRequest requestConnection(final HttpRoute route,
+ final Object state) {
+ return new ConnectionRequest() {
+ @Override
+ public HttpClientConnection get(final long timeout,
+ final TimeUnit timeUnit)
+ throws InterruptedException, ExecutionException,
+ ConnectionPoolTimeoutException {
+ try {
+ HttpClientConnection clientConn = kac.get(route);
+ if (clientConn != null) {
+ return clientConn;
+ }
+ return httpConnectionFactory.create(route, null);
+ } catch (IOException ex) {
+ throw new ExecutionException(ex);
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+ };
+ }
+
+ /**
+ * Releases a connection for reuse. It can be reused only if validDuration
is greater than 0.
+ * This method is called by {@link org.apache.http.impl.execchain} internal
class `ConnectionHolder`.
+ * If it wants to reuse the connection, it will send a non-zero
validDuration, else it will send 0.
+ * @param conn the connection to release
+ * @param newState the new state of the connection
+ * @param validDuration the duration for which the connection is valid
+ * @param timeUnit the time unit for the validDuration
+ */
+ @Override
+ public void releaseConnection(final HttpClientConnection conn,
+ final Object newState,
+ final long validDuration,
+ final TimeUnit timeUnit) {
+ if (validDuration == 0) {
+ return;
+ }
+ if (conn.isOpen() && conn instanceof AbfsManagedApacheHttpConnection) {
+ HttpRoute route = ((AbfsManagedApacheHttpConnection)
conn).getHttpRoute();
+ if (route != null) {
+ kac.put(route, conn);
+ }
+ }
+ }
+
+ @Override
+ public void connect(final HttpClientConnection conn,
+ final HttpRoute route,
+ final int connectTimeout,
+ final HttpContext context) throws IOException {
+ long start = System.currentTimeMillis();
+ connectionOperator.connect((AbfsManagedApacheHttpConnection) conn,
+ route.getTargetHost(), route.getLocalSocketAddress(),
+ connectTimeout, SocketConfig.DEFAULT, context);
+ if (context instanceof AbfsManagedHttpClientContext) {
+ ((AbfsManagedHttpClientContext) context).setConnectTime(
+ System.currentTimeMillis() - start);
+ }
+ }
+
+ @Override
+ public void upgrade(final HttpClientConnection conn,
+ final HttpRoute route,
+ final HttpContext context) throws IOException {
+ connectionOperator.upgrade((AbfsManagedApacheHttpConnection) conn,
+ route.getTargetHost(), context);
+ }
+
+ @Override
+ public void routeComplete(final HttpClientConnection conn,
+ final HttpRoute route,
+ final HttpContext context) throws IOException {
+
+ }
+
+ @Override
+ public void closeIdleConnections(final long idletime,
+ final TimeUnit timeUnit) {
+
+ }
+
+ @Override
+ public void closeExpiredConnections() {
Review Comment:
why not do this? are you sure the java KeepAliveCache does the right thing?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
+ extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
+ implements Runnable {
+
+ private int maxConn;
+
+ private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+
+ private Thread keepAliveTimer = null;
+
+ private boolean isPaused = false;
+
+ private KeepAliveCache() {
+ setMaxConn();
+ }
+
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ synchronized void resumeThread() {
+ isPaused = false;
+ notify();
+ }
+
+ private void setMaxConn() {
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+ }
+
+ public void setAbfsConfig(AbfsConfiguration abfsConfiguration) {
+ this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ this.connectionIdleTTL =
abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ }
+
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ private static final KeepAliveCache INSTANCE = new KeepAliveCache();
+
+ public static KeepAliveCache getInstance() {
+ return INSTANCE;
+ }
+
+ @VisibleForTesting
+ void clearThread() {
+ clear();
+ setMaxConn();
+ }
+
+ private int getKacSize() {
Review Comment:
give a better name
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling
`KeepAliveCache`
Review Comment:
this is not copied from the JDK is it?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
Review Comment:
add javadocs all the way down
> [ABFS]: ApacheHttpClient adaptation as network library
> ------------------------------------------------------
>
> Key: HADOOP-19120
> URL: https://issues.apache.org/jira/browse/HADOOP-19120
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0
> Reporter: Pranav Saxena
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
>
> Apache HttpClient is more feature-rich and flexible and gives application
> more granular control over networking parameter.
> ABFS currently relies on the JDK-net library. This library is managed by
> OpenJDK and has no performance problem. However, it limits the application's
> control over networking, and there are very few APIs and hooks exposed that
> the application can use to get metrics, choose which and when a connection
> should be reused. ApacheHttpClient will give important hooks to fetch
> important metrics and control networking parameters.
> A custom implementation of connection-pool is used. The implementation is
> adapted from the JDK8 connection pooling. Reasons for doing it:
> 1. PoolingHttpClientConnectionManager heuristic caches all the reusable
> connections it has created. JDK's implementation only caches limited number
> of connections. The limit is given by JVM system property
> "http.maxConnections". If there is no system-property, it defaults to 5.
> Connection-establishment latency increased with all the connections were
> cached. Hence, adapting the pooling heuristic of JDK netlib,
> 2. In PoolingHttpClientConnectionManager, it expects the application to
> provide `setMaxPerRoute` and `setMaxTotal`, which the implementation uses as
> the total number of connections it can create. For application using ABFS, it
> is not feasible to provide a value in the initialisation of the
> connectionManager. JDK's implementation has no cap on the number of
> connections it can have opened on a moment. Hence, adapting the pooling
> heuristic of JDK netlib,
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]