This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 97eb2b9172 [Fix](multi-catalog) Fix broker load reader and hdfs reader 
issue. (#23529)
97eb2b9172 is described below

commit 97eb2b91726ca24622c20c20fb1ab5b9e1c85559
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Tue Aug 29 13:45:48 2023 +0800

    [Fix](multi-catalog) Fix broker load reader and hdfs reader issue. (#23529)
    
    Broker load with broker sometimes will throw 'Invalid orc post script 
length'.
    hdfs query sometimes will throw 'Invalid orc post script length'.
---
 be/src/io/fs/hdfs_file_reader.cpp                  | 20 +++++---
 .../doris/broker/hdfs/FileSystemManager.java       | 54 +++++++++-------------
 2 files changed, 37 insertions(+), 37 deletions(-)

diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index 383958802f..6c4f456e37 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -133,13 +133,21 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
         return Status::OK();
     }
 
-    tSize r = hdfsPread(_handle->fs(), _handle->file(), offset, to, bytes_req);
-    if (r == -1) {
-        return Status::InternalError(
-                "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
-                BackendOptions::get_localhost(), _name_node, _path.string(), 
hdfs_error());
+    size_t has_read = 0;
+    while (has_read < bytes_req) {
+        tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + 
has_read,
+                                    to + has_read, bytes_req - has_read);
+        if (loop_read < 0) {
+            return Status::InternalError(
+                    "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, 
err: {}",
+                    BackendOptions::get_localhost(), _name_node, 
_path.string(), hdfs_error());
+        }
+        if (loop_read == 0) {
+            break;
+        }
+        has_read += loop_read;
     }
-    *bytes_read = bytes_req;
+    *bytes_read = has_read;
     return Status::OK();
 }
 
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index d86df86fe0..d25947e33b 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -1204,27 +1204,32 @@ public class FileSystemManager {
             // Avoid using the ByteBuffer based read for Hadoop because some 
FSDataInputStream
             // implementations are not ByteBufferReadable,
             // See https://issues.apache.org/jira/browse/HADOOP-14603
-            byte[] buf;
-            if (length > readBufferSize) {
-                buf = new byte[readBufferSize];
-            } else {
-                buf = new byte[(int) length];
-            }
-            try {
-                int readLength = readBytesFully(fsDataInputStream, buf);
-                if (readLength < 0) {
-                    throw new 
BrokerException(TBrokerOperationStatusCode.END_OF_FILE,
-                            "end of file reached");
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("read buffer from input stream, buffer size:" 
+ buf.length + ", read length:" + readLength);
+            int hasRead = 0;
+            byte[] buf = new byte[(int)length];
+            while (hasRead < length) {
+                int bufSize = Math.min((int) length - hasRead, readBufferSize);
+                try {
+                    int readLength = fsDataInputStream.read(buf, hasRead, 
bufSize);
+                    if (readLength < 0) {
+                        throw new 
BrokerException(TBrokerOperationStatusCode.END_OF_FILE,
+                                "end of file reached");
+                    }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("read buffer from input stream, buffer 
size:" + buf.length + ", read length:"
+                                + readLength);
+                    }
+                    hasRead += readLength;
+                } catch (IOException e) {
+                    logger.error("errors while read data from stream", e);
+                    throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
+                            e, "errors while read data from stream");
                 }
-                return ByteBuffer.wrap(buf, 0, readLength);
-            } catch (IOException e) {
-                logger.error("errors while read data from stream", e);
+            }
+            if (hasRead != length) {
                 throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
-                        e, "errors while write data to output stream");
+                        String.format("errors while read data from stream: 
hasRead(%d) != length(%d)", hasRead, length));
             }
+            return ByteBuffer.wrap(buf, 0, hasRead);
         }
     }
 
@@ -1325,19 +1330,6 @@ public class FileSystemManager {
         return new TBrokerFD(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
     }
 
-    private int readBytesFully(FSDataInputStream is, byte[] dest) throws 
IOException {
-        int readLength = 0;
-        while (readLength < dest.length) {
-            int availableReadLength = dest.length - readLength;
-            int n = is.read(dest, readLength, availableReadLength);
-            if (n <= 0) {
-                break;
-            }
-            readLength += n;
-        }
-        return readLength;
-    }
-
     /**
      *   In view of the different expiration mechanisms of different 
authentication modes,
      *   there are two ways to determine whether BrokerFileSystem has expired:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to