[
https://issues.apache.org/jira/browse/HADOOP-19139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864491#comment-17864491
]
ASF GitHub Bot commented on HADOOP-19139:
-----------------------------------------
rakeshadr commented on code in PR #6699:
URL: https://github.com/apache/hadoop/pull/6699#discussion_r1671604923
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java:
##########
@@ -34,6 +34,7 @@ public final class HttpHeaderConfigurations {
public static final String IF_MATCH = "If-Match";
public static final String IF_NONE_MATCH = "If-None-Match";
public static final String CONTENT_LENGTH = "Content-Length";
+ public static final String CONTENT_RANGE = "Content-Range";
Review Comment:
Couldn't see its usage. Pls remove it, if not required.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -321,5 +321,21 @@ public static String accountProperty(String property,
String account) {
* @see FileSystem#openFile(org.apache.hadoop.fs.Path)
*/
public static final String FS_AZURE_BUFFERED_PREAD_DISABLE =
"fs.azure.buffered.pread.disable";
+
+ /**
+ * Enable lazy opening of an inputStream. Lazy opening would not call HEAD
call
+ * to get file metadata before creating inputStream. ReadPath API of server
+ * would give the contentLength and eTag which would be used in subsequent
calls
+ * for if-match headers.
+ */
+ public static final String
+ FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED =
"fs.azure.input.stream.lazy.open.optimization.enabled";
+
+ /**
+ * Enable prefetch on the first read to {@link
org.apache.hadoop.fs.azurebfs.services.AbfsInputStream}.
+ * If disabled, first call would not trigger prefetch. Prefetch would be
switched on
Review Comment:
As these are two combinational configurations, can you document the behavior
of these two configurations and its combinational behavior in
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/site/markdown/index.md
or in
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
Does Azure documentation cover prefetching behavior and configurations?
Could you please point me to that. If not, then I think it's good to document
it in a follow-up jira task, later. I could see a good doc for the aws part,
fyi.
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -890,32 +891,37 @@ public AbfsInputStream openFileForRead(Path path,
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
- AbfsHttpOperation op = client.getPathStatus(relativePath, false,
- tracingContext, null).getResult();
- resourceType = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- contentLength = Long.parseLong(
- op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
- eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
- /*
- * For file created with ENCRYPTION_CONTEXT, client shall receive
- * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
- */
- if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
- final String fileEncryptionContext = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
- if (fileEncryptionContext == null) {
- LOG.debug("EncryptionContext missing in GetPathStatus response");
- throw new PathIOException(path.toString(),
- "EncryptionContext not present in GetPathStatus response
headers");
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT
Review Comment:
Here, why do we need EncryptionType.ENCRYPTION_CONTEXT in the condition?
Should the condition be like below
```
if(!abfsConfiguration.isInputStreamLazyOptimizationEnabled()){
final AbfsHttpOperation op =
client.getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
contentLength = Long.parseLong(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -1148,7 +1149,9 @@ public AbfsRestOperation read(final String path,
AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1));
requestHeaders.add(rangeHeader);
- requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
+ if (!StringUtils.isEmpty(eTag)) {
Review Comment:
use `StringUtils.isNotEmpty` for better readability.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -488,7 +601,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){
private int readInternal(final long position, final byte[] b, final int
offset, final int length,
final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
+ if (readAheadEnabled && !bypassReadAhead && (prefetchTriggerOnFirstRead ||
sequentialReadStarted)) {
Review Comment:
Here also, you can make a method `effectiveReadAhead` or a better name.
`if(effectiveReadAhead())`
```
/**
* <add_description>
*/
private boolean effectiveReadAhead() {
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -424,11 +515,20 @@ private int optimisedRead(final byte[] b, final int off,
final int len,
// If the read was partial and the user requested part of data has
// not read then fallback to readoneblock. When limit is smaller than
// bCursor that means the user requested data has not been read.
- if (fCursor < contentLength && bCursor > limit) {
+ if (fCursor < getContentLength() && bCursor > limit) {
restorePointerState();
return readOneBlock(b, off, len);
}
- return copyToUserBuffer(b, off, len);
+ return copyToUserBuffer(b, off, len,
isOptimizedReadWithoutContentLengthInformation);
+ }
+
+ @VisibleForTesting
+ long getContentLength() {
+ return contentLength;
+ }
+
+ boolean getFileStatusInformationPresent() {
Review Comment:
1) Please make it to `private boolean` visibility.
2) How abt rename the method to =>` hasFileStatusInfo()`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -467,7 +566,21 @@ private boolean validate(final byte[] b, final int off,
final int len)
return true;
}
- private int copyToUserBuffer(byte[] b, int off, int len){
+ private int copyToUserBuffer(byte[] b, int off, int len,
+ final boolean isOptimizedReadWithoutContentLengthInformation){
Review Comment:
Its relative thinking, we can reduce the var name
`isOptimizedReadWithoutContentLengthInformation` -> to ->
`isOptimizedReadWithoutContentLengthInfo`
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -573,11 +688,25 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+ abfsHttpOperation = ((AbfsRestOperationException)
ex).getAbfsHttpOperation();
if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ere.getMessage());
}
+ /*
+ * Status 416 is sent when read range is out of contentLength range.
+ * This would happen only in the case if contentLength is not known
before
+ * opening the inputStream.
+ */
+ if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE
+ && !getFileStatusInformationPresent()) {
+ return -1;
Review Comment:
Can you add log message about the exception case.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -591,6 +720,31 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
return (int) bytesRead;
}
+ private void initPropertiesFromReadResponseHeader(final AbfsHttpOperation
op) throws IOException {
+ if (DIRECTORY.equals(
Review Comment:
Its duplicated in two places. Probably you can create a method and use it
```
if (DIRECTORY.equals(
op.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE))) {
throw new FileNotFoundException(
"read must be used with files and not directories. Path: " + path);
}
contentLength = parseFromRange(
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE));
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
if (eTag != null && contentLength >= 0) {
fileStatusInformationPresent = true;
}
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -385,32 +434,74 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!getFileStatusInformationPresent()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isOptimizedReadWithoutContentLengthInformation) throws
IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
getFileStatusInformationPresent();
+ /*
+ * Content length would not be available for the first optimized read in
case
+ * of lazy head optimization in inputStream. In such case, read of the
first optimized read
+ * would be done without the contentLength constraint. Post first call,
the contentLength
+ * would be present and should be used for further reads.
+ */
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!getFileStatusInformationPresent()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
+
+ /*
+ * In non-lazily opened inputStream, the contentLength would be
available before
+ * opening the inputStream. In such case, optimized read would
always be done
+ * on the last part of the file.
+ *
+ * In lazily opened inputStream, the contentLength would not be
available before
+ * opening the inputStream. In such case, contentLength conditioning
would not be
+ * applied to execute optimizedRead. Hence, the optimized read may
not be done on the
+ * last part of the file. If the optimized read is done on the
non-last part of the
+ * file, inputStream should read only the amount of data requested
by optimizedRead,
+ * as the buffer supplied would be only of the size of the data
requested by optimizedRead.
+ */
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+ && totalBytesRead == (int) actualLen;
+ if (shouldBreak) {
+ break;
+ }
}
}
} catch (IOException e) {
+ if (e instanceof FileNotFoundException) {
Review Comment:
can't we add explicit catch block like,
```
} catch (FileNotFoundException fnfe) {
// <add_required_logic>
} catch (IOException e) {
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -145,6 +158,10 @@ public AbfsInputStream(
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
+ this.fileStatusInformationPresent = StringUtils.isNotEmpty(eTag);
+ this.prefetchTriggerOnFirstRead =
+ abfsInputStreamContext.isPrefetchTriggerOnFirstRead()
+ && getFileStatusInformationPresent();
Review Comment:
Move this to a method with a meaningful name and add javadoc about the case.
Something like below or a better method name.
```
/**
* <describe the combinational behavior>
*/
public boolean
getEffectivePrefetchTriggerOnFirstRead(....<add_required_args>) {
return abfsInputStreamContext.isPrefetchTriggerOnFirstRead()
&& getFileStatusInformationPresent();
}
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -890,32 +891,37 @@ public AbfsInputStream openFileForRead(Path path,
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
- AbfsHttpOperation op = client.getPathStatus(relativePath, false,
- tracingContext, null).getResult();
- resourceType = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- contentLength = Long.parseLong(
- op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
- eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
- /*
- * For file created with ENCRYPTION_CONTEXT, client shall receive
- * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
- */
- if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
- final String fileEncryptionContext = op.getResponseHeader(
- HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
- if (fileEncryptionContext == null) {
- LOG.debug("EncryptionContext missing in GetPathStatus response");
- throw new PathIOException(path.toString(),
- "EncryptionContext not present in GetPathStatus response
headers");
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT
+ || !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) {
+ final AbfsHttpOperation op = client.getPathStatus(relativePath,
false,
+ tracingContext, null).getResult();
+ resourceType = op.getResponseHeader(
+ HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ contentLength = Long.parseLong(
+ op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+ eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+ /*
+ * For file created with ENCRYPTION_CONTEXT, client shall receive
+ * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
+ */
+ if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT)
{
+ final String fileEncryptionContext = op.getResponseHeader(
+ HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
+ if (fileEncryptionContext == null) {
+ LOG.debug("EncryptionContext missing in GetPathStatus response");
+ throw new PathIOException(path.toString(),
+ "EncryptionContext not present in GetPathStatus response
headers");
+ }
+ contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
+ client.getEncryptionContextProvider(), getRelativePath(path),
+ fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
- contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
- client.getEncryptionContextProvider(), getRelativePath(path),
- fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
}
- if (parseIsDirectory(resourceType)) {
+ if ((fileStatus != null ||
!abfsConfiguration.isInputStreamLazyOptimizationEnabled())
Review Comment:
Its good to keep the conditions simple for the code maintenance. Have you
added `filestatus != null. ` for the defensive coding?
If `filestatus != null` check is really required, then please add java
comments about the case where it becomes null.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -385,32 +434,74 @@ private int readLastBlock(final byte[] b, final int off,
final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
- long lastBlockStart = max(0, contentLength - footerReadSize);
+ if (!getFileStatusInformationPresent()) {
+ long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+ bCursor = (int) (fCursor - lastBlockStart);
+ return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len,
footerReadSize), true);
+ }
+ long lastBlockStart = max(0, getContentLength() - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
- long actualLenToRead = min(footerReadSize, contentLength);
- return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+ long actualLenToRead = min(footerReadSize, getContentLength());
+ return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
}
private int optimisedRead(final byte[] b, final int off, final int len,
- final long readFrom, final long actualLen) throws IOException {
+ final long readFrom, final long actualLen,
+ final boolean isOptimizedReadWithoutContentLengthInformation) throws
IOException {
fCursor = readFrom;
int totalBytesRead = 0;
int lastBytesRead = 0;
try {
buffer = new byte[bufferSize];
+ boolean fileStatusInformationPresentBeforeRead =
getFileStatusInformationPresent();
+ /*
+ * Content length would not be available for the first optimized read in
case
+ * of lazy head optimization in inputStream. In such case, read of the
first optimized read
+ * would be done without the contentLength constraint. Post first call,
the contentLength
+ * would be present and should be used for further reads.
+ */
for (int i = 0;
- i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+ i < MAX_OPTIMIZED_READ_ATTEMPTS &&
(!getFileStatusInformationPresent()
+ || fCursor < getContentLength()); i++) {
lastBytesRead = readInternal(fCursor, buffer, limit,
(int) actualLen - limit, true);
if (lastBytesRead > 0) {
totalBytesRead += lastBytesRead;
limit += lastBytesRead;
fCursor += lastBytesRead;
fCursorAfterLastRead = fCursor;
+
+ /*
+ * In non-lazily opened inputStream, the contentLength would be
available before
+ * opening the inputStream. In such case, optimized read would
always be done
+ * on the last part of the file.
+ *
+ * In lazily opened inputStream, the contentLength would not be
available before
+ * opening the inputStream. In such case, contentLength conditioning
would not be
+ * applied to execute optimizedRead. Hence, the optimized read may
not be done on the
+ * last part of the file. If the optimized read is done on the
non-last part of the
+ * file, inputStream should read only the amount of data requested
by optimizedRead,
+ * as the buffer supplied would be only of the size of the data
requested by optimizedRead.
+ */
+ boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+ && totalBytesRead == (int) actualLen;
Review Comment:
Here also, move this condition a method and use it in the if condition.
```
/*
* In non-lazily opened inputStream, the contentLength would be available
before
* opening the inputStream. In such case, optimized read would always be done
* on the last part of the file.
*
* In lazily opened inputStream, the contentLength would not be available
before
* opening the inputStream. In such case, contentLength conditioning would
not be
* applied to execute optimizedRead. Hence, the optimized read may not be
done on the
* last part of the file. If the optimized read is done on the non-last part
of the
* file, inputStream should read only the amount of data requested by
optimizedRead,
* as the buffer supplied would be only of the size of the data requested by
optimizedRead.
*/
public boolean shouldBreakxxxxx(....<add_required_args>){
boolean shouldBreak = !fileStatusInformationPresentBeforeRead
&& totalBytesRead == (int) actualLen;
```
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -573,11 +688,25 @@ int readRemote(long position, byte[] b, int offset, int
length, TracingContext t
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+ abfsHttpOperation = ((AbfsRestOperationException)
ex).getAbfsHttpOperation();
if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ere.getMessage());
}
+ /*
+ * Status 416 is sent when read range is out of contentLength range.
+ * This would happen only in the case if contentLength is not known
before
+ * opening the inputStream.
+ */
+ if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE
+ && !getFileStatusInformationPresent()) {
+ return -1;
+ }
}
throw new IOException(ex);
+ } finally {
+ if (!getFileStatusInformationPresent() && abfsHttpOperation != null) {
Review Comment:
can you move condition` !getFileStatusInformationPresent()` to inside
`initPropertiesFromReadResponseHeader`
```
void initPropertiesFromReadResponseHeader(abfsHttpOperation) {
if(!getFileStatusInformationPresent()) {
return;
}
//...
}
```
> [ABFS]: No GetPathStatus call for opening AbfsInputStream
> ---------------------------------------------------------
>
> Key: HADOOP-19139
> URL: https://issues.apache.org/jira/browse/HADOOP-19139
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Reporter: Pranav Saxena
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
>
> Read API gives contentLen and etag of the path. This information would be
> used in future calls on that inputStream. Prior information of eTag is of not
> much importance.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]