rakeshadr commented on code in PR #6699:
URL: https://github.com/apache/hadoop/pull/6699#discussion_r1671604923
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java:
##########
@@ -34,6 +34,7 @@ public final class HttpHeaderConfigurations {
public static final String IF_MATCH = "If-Match";
public static final String IF_NONE_MATCH = "If-None-Match";
public static final String CONTENT_LENGTH = "Content-Length";
+ public static final String CONTENT_RANGE = "Content-Range";
Review Comment:
Couldn't see its usage. Pls remove it, if not required.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -321,5 +321,21 @@ public static String accountProperty(String property,
String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE =
"fs.azure.buffered.pread.disable";
+
+ /**
+ * Enable lazy opening of an inputStream. Lazy opening would not call HEAD
call
+ * to get file metadata before creating inputStream. ReadPath API of server
+ * would give the contentLength and eTag which would be used in subsequent
calls
+ * for if-match headers.
+ */
+ public static final String
+ FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED =
"fs.azure.input.stream.lazy.open.optimization.enabled";
+
+ /**
+ * Enable prefetch on the first read to {@link
org.apache.hadoop.fs.azurebfs.services.AbfsInputStream}.
+ * If disabled, first call would not trigger prefetch. Prefetch would be
switched on
Review Comment:
As these are two combinational configurations, can you document the behavior
of these two configurations and its combinational behavior in
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/site/markdown/index.md
or in
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
Does Azure documentation cover prefetching behavior and configurations?
Could you please point me to that. If not, then I think it's good to document
it in a follow-up jira task, later. I could see a good doc for the aws part,
fyi.
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -890,32 +891,37 @@ public AbfsInputStream openFileForRead(Path path,
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
- AbfsHttpOperation op = client.getPathStatus(relativePath, false,
- tracingContext, null).getResult();
- resourceType = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- contentLength = Long.parseLong(
- op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
- eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
- /*
- * For file created with ENCRYPTION_CONTEXT, client shall receive
- * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
- */
- if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
- final String fileEncryptionContext = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
- if (fileEncryptionContext == null) {
- LOG.debug("EncryptionContext missing in GetPathStatus response");
- throw new PathIOException(path.toString(),
- "EncryptionContext not present in GetPathStatus response
headers");
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT
Review Comment:
Here, why do we need EncryptionType.ENCRYPTION_CONTEXT in the condition?
Should the condition be like below
```
if(!abfsConfiguration.isInputStreamLazyOptimizationEnabled()){
final AbfsHttpOperation op =
client.getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1148,7 +1149,9 @@ public AbfsRestOperation read(final String path,
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
- requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ if (!StringUtils.isEmpty(eTag)) {
Review Comment:
use `StringUtils.isNotEmpty` for better readability.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -488,7 +601,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){
private int readInternal(final long position, final byte[] b, final int
offset, final int length,
final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
+ if (readAheadEnabled && !bypassReadAhead && (prefetchTriggerOnFirstRead ||
sequentialReadStarted)) {
Review Comment:
Here also, you can make a method `effectiveReadAhead` or a better name.
`if(effectiveReadAhead())`
```
/**
* <add_description>
*/
private boolean effectiveReadAhead() {
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -424,11 +515,20 @@ private int optimisedRead(final byte[] b, final int off,
final int len,
// If the read was partial and the user requested part of data has
// not read then fallback to readoneblock. When limit is smaller than
// bCursor that means the user requested data has not been read.
- if (fCursor < contentLength && bCursor > limit) {
+ if (fCursor < getContentLength() && bCursor > limit) {
restorePointerState();
return readOneBlock(b, off, len);
}
- return copyToUserBuffer(b, off, len);
+ return copyToUserBuffer(b, off, len,
isOptimizedReadWithoutContentLengthInformation);
+ }
+
+ @VisibleForTesting
+ long getContentLength() {
+ return contentLength;
+ }
+
+ boolean getFileStatusInformationPresent() {
Review Comment:
1) Please make it to `private boolean` visibility.
2) How abt rename the method to =>` hasFileStatusInfo()`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -467,7 +566,21 @@ private boolean validate(final byte[] b, final int off,
final int len)
return true;
}
- private int copyToUserBuffer(byte[] b, int off, int len){
+ private int copyToUserBuffer(byte[] b, int off, int len,
+ final boolean isOptimizedReadWithoutContentLengthInformation){
Review Comment:
Its relative thinking, we can reduce the var name
`isOptimizedReadWithoutContentLengthInformation` -> to ->
`isOptimizedReadWithoutContentLengthInfo`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -573,11 +688,25 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+ abfsHttpOperation = ((AbfsRestOperationException)
ex).getAbfsHttpOperation();
if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ere.getMessage());
}
+ /*
+ * Status 416 is sent when read range is out of contentLength range.
+ * This would happen only in the case if contentLength is not known
before
+ * opening the inputStream.
+ */
+ if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE
+ && !getFileStatusInformationPresent()) {
+ return -1;
Review Comment:
Can you add log message about the exception case.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -591,6 +720,31 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
return (int) bytesRead;
}
+ private void initPropertiesFromReadResponseHeader(final AbfsHttpOperation
op) throws IOException {
+ if (DIRECTORY.equals(
Review Comment:
Its duplicated in two places. Probably you can create a method and use it
```
if (DIRECTORY.equals(
op.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE))) {
throw new FileNotFoundException(
"read must be used with files and not directories. Path: " + path);
}
contentLength = parseFromRange(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
if (eTag != null && contentLength >= 0) {
fileStatusInformationPresent = true;
}
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -385,32 +434,74 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!getFileStatusInformationPresent()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isOptimizedReadWithoutContentLengthInformation) throws
IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
getFileStatusInformationPresent();
+ /*
+ * Content length would not be available for the first optimized read in
case
+ * of lazy head optimization in inputStream. In such case, read of the
first optimized read
+ * would be done without the contentLength constraint. Post first call,
the contentLength
+ * would be present and should be used for further reads.
+ */
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!getFileStatusInformationPresent()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
+
+ /*
+ * In non-lazily opened inputStream, the contentLength would be
available before
+ * opening the inputStream. In such case, optimized read would
always be done
+ * on the last part of the file.
+ *
+ * In lazily opened inputStream, the contentLength would not be
available before
+ * opening the inputStream. In such case, contentLength conditioning
would not be
+ * applied to execute optimizedRead. Hence, the optimized read may
not be done on the
+ * last part of the file. If the optimized read is done on the
non-last part of the
+ * file, inputStream should read only the amount of data requested
by optimizedRead,
+ * as the buffer supplied would be only of the size of the data
requested by optimizedRead.
+ */
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+ && totalBytesRead == (int) actualLen;
+ if (shouldBreak) {
+ break;
+ }
}
}
} catch (IOException e) {
+ if (e instanceof FileNotFoundException) {
Review Comment:
can't we add explicit catch block like,
```
} catch (FileNotFoundException fnfe) {
// <add_required_logic>
} catch (IOException e) {
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -145,6 +158,10 @@ public AbfsInputStream(
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
+ this.fileStatusInformationPresent = StringUtils.isNotEmpty(eTag);
+ this.prefetchTriggerOnFirstRead =
+ abfsInputStreamContext.isPrefetchTriggerOnFirstRead()
+ && getFileStatusInformationPresent();
Review Comment:
Move this to a method with a meaningful name and add javadoc about the case.
Something like below or a better method name.
```
/**
* <describe the combinational behavior>
*/
public boolean
getEffectivePrefetchTriggerOnFirstRead(....<add_required_args>) {
return abfsInputStreamContext.isPrefetchTriggerOnFirstRead()
&& getFileStatusInformationPresent();
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -890,32 +891,37 @@ public AbfsInputStream openFileForRead(Path path,
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
- AbfsHttpOperation op = client.getPathStatus(relativePath, false,
- tracingContext, null).getResult();
- resourceType = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- contentLength = Long.parseLong(
- op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
- eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
- /*
- * For file created with ENCRYPTION_CONTEXT, client shall receive
- * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
- */
- if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
- final String fileEncryptionContext = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
- if (fileEncryptionContext == null) {
- LOG.debug("EncryptionContext missing in GetPathStatus response");
- throw new PathIOException(path.toString(),
- "EncryptionContext not present in GetPathStatus response
headers");
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT
+ || !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) {
+ final AbfsHttpOperation op = client.getPathStatus(relativePath,
false,
+ tracingContext, null).getResult();
+ resourceType = op.getResponseHeader(
+ HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ contentLength = Long.parseLong(
+ op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+ eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+ /*
+ * For file created with ENCRYPTION_CONTEXT, client shall receive
+ * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
+ */
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT)
{
+ final String fileEncryptionContext = op.getResponseHeader(
+ HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
+ if (fileEncryptionContext == null) {
+ LOG.debug("EncryptionContext missing in GetPathStatus response");
+ throw new PathIOException(path.toString(),
+ "EncryptionContext not present in GetPathStatus response
headers");
+ }
+ contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
+ client.getEncryptionContextProvider(), getRelativePath(path),
+ fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
- contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
- client.getEncryptionContextProvider(), getRelativePath(path),
- fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
}
- if (parseIsDirectory(resourceType)) {
+ if ((fileStatus != null ||
!abfsConfiguration.isInputStreamLazyOptimizationEnabled())
Review Comment:
Its good to keep the conditions simple for the code maintenance. Have you
added `filestatus != null. ` for the defensive coding?
If `filestatus != null` check is really required, then please add java
comments about the case where it becomes null.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -385,32 +434,74 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!getFileStatusInformationPresent()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isOptimizedReadWithoutContentLengthInformation) throws
IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
getFileStatusInformationPresent();
+ /*
+ * Content length would not be available for the first optimized read in
case
+ * of lazy head optimization in inputStream. In such case, read of the
first optimized read
+ * would be done without the contentLength constraint. Post first call,
the contentLength
+ * would be present and should be used for further reads.
+ */
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!getFileStatusInformationPresent()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
+
+ /*
+ * In non-lazily opened inputStream, the contentLength would be
available before
+ * opening the inputStream. In such case, optimized read would
always be done
+ * on the last part of the file.
+ *
+ * In lazily opened inputStream, the contentLength would not be
available before
+ * opening the inputStream. In such case, contentLength conditioning
would not be
+ * applied to execute optimizedRead. Hence, the optimized read may
not be done on the
+ * last part of the file. If the optimized read is done on the
non-last part of the
+ * file, inputStream should read only the amount of data requested
by optimizedRead,
+ * as the buffer supplied would be only of the size of the data
requested by optimizedRead.
+ */
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+ && totalBytesRead == (int) actualLen;
Review Comment:
Here also, move this condition a method and use it in the if condition.
```
/*
* In non-lazily opened inputStream, the contentLength would be available
before
* opening the inputStream. In such case, optimized read would always be done
* on the last part of the file.
*
* In lazily opened inputStream, the contentLength would not be available
before
* opening the inputStream. In such case, contentLength conditioning would
not be
* applied to execute optimizedRead. Hence, the optimized read may not be
done on the
* last part of the file. If the optimized read is done on the non-last part
of the
* file, inputStream should read only the amount of data requested by
optimizedRead,
* as the buffer supplied would be only of the size of the data requested by
optimizedRead.
*/
public boolean shouldBreakxxxxx(....<add_required_args>){
boolean shouldBreak = !fileStatusInformationPresentBeforeRead
&& totalBytesRead == (int) actualLen;
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -573,11 +688,25 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+ abfsHttpOperation = ((AbfsRestOperationException)
ex).getAbfsHttpOperation();
if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ere.getMessage());
}
+ /*
+ * Status 416 is sent when read range is out of contentLength range.
+ * This would happen only in the case if contentLength is not known
before
+ * opening the inputStream.
+ */
+ if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE
+ && !getFileStatusInformationPresent()) {
+ return -1;
+ }
}
throw new IOException(ex);
+ } finally {
+ if (!getFileStatusInformationPresent() && abfsHttpOperation != null) {
Review Comment:
can you move condition` !getFileStatusInformationPresent()` to inside
`initPropertiesFromReadResponseHeader`
```
void initPropertiesFromReadResponseHeader(abfsHttpOperation) {
if(!getFileStatusInformationPresent()) {
return;
}
//...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]