This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch orc
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/orc by this push:
     new f10bb3d9113 [Feature] Add input stream of stripe streams in stripe 
reader. (#270)
f10bb3d9113 is described below

commit f10bb3d91136149845f69fa51ad3a0663941c43d
Author: Qi Chen <che...@selectdb.com>
AuthorDate: Fri Jan 10 09:00:56 2025 +0800

    [Feature] Add input stream of stripe streams in stripe reader. (#270)
---
 c++/include/orc/Common.hh  | 49 +++++++++++++++++++++++++++++++++++++++
 c++/include/orc/OrcFile.hh |  6 +++--
 c++/src/Reader.cc          | 58 ++++++++++++++++++++++++++--------------------
 c++/src/Reader.hh          | 12 +++++-----
 c++/src/StripeStream.cc    | 17 ++++++++------
 c++/src/StripeStream.hh    | 10 ++++----
 6 files changed, 108 insertions(+), 44 deletions(-)

diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
index beae9dd6f31..9df0b0ed6a8 100644
--- a/c++/include/orc/Common.hh
+++ b/c++/include/orc/Common.hh
@@ -156,6 +156,46 @@ namespace orc {
 
   std::string columnEncodingKindToString(ColumnEncodingKind kind);
 
+  class StreamId {
+   public:
+    StreamId(uint64_t columnId, StreamKind streamKind)
+        : _columnId(columnId), _streamKind(streamKind) {}
+
+    size_t hash() const {
+      size_t h1 = std::hash<uint64_t>{}(_columnId);
+      size_t h2 = std::hash<int>{}(static_cast<int>(_streamKind));
+      return h1 ^ (h2 << 1);
+    }
+
+    bool operator==(const StreamId& other) const {
+      return _columnId == other._columnId && _streamKind == other._streamKind;
+    }
+
+    bool operator<(const StreamId& other) const {
+      if (_columnId != other._columnId) {
+        return _columnId < other._columnId;
+      }
+      return static_cast<int>(_streamKind) < 
static_cast<int>(other._streamKind);
+    }
+
+    std::string toString() const {
+      std::ostringstream oss;
+      oss << "[columnId=" << _columnId << ", streamKind=" << 
static_cast<int>(_streamKind) << "]";
+      return oss.str();
+    }
+
+    uint64_t columnId() const {
+      return _columnId;
+    }
+    StreamKind streamKind() const {
+      return _streamKind;
+    }
+
+   private:
+    uint64_t _columnId;
+    StreamKind _streamKind;
+  };
+
   class StripeInformation {
    public:
     virtual ~StripeInformation();
@@ -306,4 +346,13 @@ namespace orc {
 
 }  // namespace orc
 
+namespace std {
+  template <>
+  struct hash<orc::StreamId> {
+    size_t operator()(const orc::StreamId& id) const {
+      return id.hash();
+    }
+  };
+}  // namespace std
+
 #endif
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index c52b66b7210..c7826e13add 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -63,8 +63,10 @@ namespace orc {
      */
     virtual const std::string& getName() const = 0;
 
-    virtual void beforeReadStripe(std::unique_ptr<StripeInformation> 
currentStripeInformation,
-                                  std::vector<bool> selectedColumns);
+    virtual void beforeReadStripe(
+        std::unique_ptr<StripeInformation> currentStripeInformation,
+        std::vector<bool> selectedColumns,
+        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams);
   };
 
   /**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index b8e1a914215..564c1f2aa6d 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -549,11 +549,14 @@ namespace orc {
       if (selectedColumns[colId] && pbStream.has_kind() &&
           (pbStream.kind() == proto::Stream_Kind_ROW_INDEX ||
            pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) {
-        std::unique_ptr<SeekableInputStream> inStream = createDecompressor(
-            getCompression(),
-            std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream(
-                contents->stream.get(), offset, pbStream.length(), 
*contents->pool)),
-            getCompressionSize(), *contents->pool, contents->readerMetrics);
+        auto iter = streams.find({colId, 
static_cast<StreamKind>(pbStream.kind())});
+        InputStream* inputStream =
+            (iter != streams.end()) ? iter->second.get() : 
contents->stream.get();
+        std::unique_ptr<SeekableInputStream> inStream =
+            createDecompressor(getCompression(),
+                               std::unique_ptr<SeekableInputStream>(new 
SeekableFileInputStream(
+                                   inputStream, offset, pbStream.length(), 
*contents->pool)),
+                               getCompressionSize(), *contents->pool, 
contents->readerMetrics);
 
         if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
           proto::RowIndex rowIndex;
@@ -951,7 +954,7 @@ namespace orc {
       readMetadata();
     }
 
-    std::vector<int> allStripesNeeded(numberOfStripes,1);
+    std::vector<int> allStripesNeeded(numberOfStripes, 1);
 
     if (opts.getSearchArgument() && footer->rowindexstride() > 0) {
       auto sargs = opts.getSearchArgument();
@@ -963,19 +966,20 @@ namespace orc {
         return allStripesNeeded;
       }
 
-      for ( uint64_t currentStripeIndex = 0;currentStripeIndex < 
numberOfStripes ; currentStripeIndex ++) {
+      for (uint64_t currentStripeIndex = 0; currentStripeIndex < 
numberOfStripes;
+           currentStripeIndex++) {
         const auto& currentStripeStats =
-              
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
-        //Not need add mMetrics,so use 0.
-        allStripesNeeded[currentStripeIndex] = 
sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);;
+            
contents->metadata->stripestats(static_cast<int>(currentStripeIndex));
+        // Not need add mMetrics,so use 0.
+        allStripesNeeded[currentStripeIndex] =
+            sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);
+        ;
       }
       contents->sargsApplier = std::move(sargsApplier);
     }
     return allStripesNeeded;
   }
 
-
-
   uint64_t maxStreamsForType(const proto::Type& type) {
     switch (static_cast<int64_t>(type.kind())) {
       case proto::Type_Kind_STRUCT:
@@ -1161,6 +1165,7 @@ namespace orc {
     reader.reset();  // ColumnReaders use lots of memory; free old memory first
     rowIndexes.clear();
     bloomFilterIndex.clear();
+    streams.clear();
     followRowInStripe = 0;
 
     // evaluate file statistics if it exists
@@ -1189,13 +1194,13 @@ namespace orc {
       if (sargsApplier) {
         bool isStripeNeeded = true;
         if (contents->metadata) {
-          const auto &currentStripeStats =
-                  
contents->metadata->stripestats(static_cast<int>(currentStripe));
+          const auto& currentStripeStats =
+              contents->metadata->stripestats(static_cast<int>(currentStripe));
           // skip this stripe after stats fail to satisfy sargs
           uint64_t stripeRowGroupCount =
-                  (rowsInCurrentStripe + footer->rowindexstride() - 1) / 
footer->rowindexstride();
+              (rowsInCurrentStripe + footer->rowindexstride() - 1) / 
footer->rowindexstride();
           isStripeNeeded =
-                  sargsApplier->evaluateStripeStatistics(currentStripeStats, 
stripeRowGroupCount);
+              sargsApplier->evaluateStripeStatistics(currentStripeStats, 
stripeRowGroupCount);
         }
         if (!isStripeNeeded) {
           // advance to next stripe when current stripe has no matching rows
@@ -1209,11 +1214,12 @@ namespace orc {
       processingStripe = currentStripe;
 
       std::unique_ptr<StripeInformation> currentStripeInformation(new 
StripeInformationImpl(
-            currentStripeInfo.offset(), currentStripeInfo.indexlength(),
-            currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
-            currentStripeInfo.numberofrows(), contents->stream.get(), 
*contents->pool,
-            contents->compression, contents->blockSize, 
contents->readerMetrics));
-      contents->stream->beforeReadStripe(std::move(currentStripeInformation), 
selectedColumns);
+          currentStripeInfo.offset(), currentStripeInfo.indexlength(),
+          currentStripeInfo.datalength(), currentStripeInfo.footerlength(),
+          currentStripeInfo.numberofrows(), contents->stream.get(), 
*contents->pool,
+          contents->compression, contents->blockSize, 
contents->readerMetrics));
+      contents->stream->beforeReadStripe(std::move(currentStripeInformation), 
selectedColumns,
+                                         streams);
 
       if (sargsApplier) {
         bool isStripeNeeded = true;
@@ -1237,8 +1243,8 @@ namespace orc {
                                            ? 
getTimezoneByName(currentStripeFooter.writertimezone())
                                            : localTimezone;
       StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, 
currentStripeFooter,
-                                      currentStripeInfo.offset(), 
*contents->stream, writerTimezone,
-                                      readerTimezone);
+                                      currentStripeInfo.offset(), 
*contents->stream, streams,
+                                      writerTimezone, readerTimezone);
       reader = buildReader(*contents->schema, stripeStreams, 
useTightNumericVector);
 
       if (stringDictFilter != nullptr) {
@@ -1760,7 +1766,9 @@ namespace orc {
       // PASS
   };
 
-  void InputStream::beforeReadStripe(std::unique_ptr<StripeInformation> 
currentStripeInformation,
-                                     std::vector<bool> selectedColumns) {}
+  void InputStream::beforeReadStripe(
+      std::unique_ptr<StripeInformation> currentStripeInformation,
+      std::vector<bool> selectedColumns,
+      std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams) {}
 
 }  // namespace orc
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 9505022c558..7ec049ad963 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -162,6 +162,7 @@ namespace orc {
 
     // contents
     std::shared_ptr<FileContents> contents;
+    std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>> streams;
     const bool throwOnHive11DecimalOverflow;
     const int32_t forcedScaleOnHive11Decimal;
 
@@ -322,10 +323,9 @@ namespace orc {
     // internal methods
     void readMetadata() const;
     void checkOrcVersion();
-    void getRowIndexStatistics(
-        const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
-        const proto::StripeFooter& currentStripeFooter,
-        std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
+    void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, 
uint64_t stripeIndex,
+                               const proto::StripeFooter& currentStripeFooter,
+                               
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;
 
     // metadata
     mutable bool isMetadataLoaded;
@@ -425,8 +425,8 @@ namespace orc {
       return contents->stream.get();
     }
 
-    void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{
-      contents->stream =  std::move(inputStreamUPtr);
+    void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override {
+      contents->stream = std::move(inputStreamUPtr);
     }
 
     uint64_t getMemoryUse(int stripeIx = -1) override;
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index 1f43da4f243..8efa23efa86 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -25,17 +25,18 @@
 
 namespace orc {
 
-  StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t 
_index,
-                                       const proto::StripeInformation& 
_stripeInfo,
-                                       const proto::StripeFooter& _footer, 
uint64_t _stripeStart,
-                                       InputStream& _input, const Timezone& 
_writerTimezone,
-                                       const Timezone& _readerTimezone)
+  StripeStreamsImpl::StripeStreamsImpl(
+      const RowReaderImpl& _reader, uint64_t _index, const 
proto::StripeInformation& _stripeInfo,
+      const proto::StripeFooter& _footer, uint64_t _stripeStart, InputStream& 
_input,
+      const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
_streams,
+      const Timezone& _writerTimezone, const Timezone& _readerTimezone)
       : reader(_reader),
         stripeInfo(_stripeInfo),
         footer(_footer),
         stripeIndex(_index),
         stripeStart(_stripeStart),
         input(_input),
+        streams(_streams),
         writerTimezone(_writerTimezone),
         readerTimezone(_readerTimezone) {
     // PASS
@@ -87,8 +88,10 @@ namespace orc {
       const proto::Stream& stream = footer.streams(i);
       if (stream.has_kind() && stream.kind() == kind &&
           stream.column() == static_cast<uint64_t>(columnId)) {
+        auto iter = streams.find({columnId, static_cast<StreamKind>(kind)});
+        InputStream* inputStream = (iter != streams.end()) ? 
iter->second.get() : &input;
         uint64_t streamLength = stream.length();
-        uint64_t myBlock = shouldStream ? input.getNaturalReadSize() : 
streamLength;
+        uint64_t myBlock = shouldStream ? inputStream->getNaturalReadSize() : 
streamLength;
         if (offset + streamLength > dataEnd) {
           std::stringstream msg;
           msg << "Malformed stream meta at stream index " << i << " in stripe 
" << stripeIndex
@@ -100,7 +103,7 @@ namespace orc {
         }
         return createDecompressor(reader.getCompression(),
                                   std::make_unique<SeekableFileInputStream>(
-                                      &input, offset, stream.length(), *pool, 
myBlock),
+                                      inputStream, offset, stream.length(), 
*pool, myBlock),
                                   reader.getCompressionSize(), *pool,
                                   reader.getFileContents().readerMetrics);
       }
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 74bebda6f25..57e51ef76f0 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -43,14 +43,16 @@ namespace orc {
     const uint64_t stripeIndex;
     const uint64_t stripeStart;
     InputStream& input;
+    const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams;
     const Timezone& writerTimezone;
     const Timezone& readerTimezone;
 
    public:
-    StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index,
-                      const proto::StripeInformation& stripeInfo, const 
proto::StripeFooter& footer,
-                      uint64_t stripeStart, InputStream& input, const 
Timezone& writerTimezone,
-                      const Timezone& readerTimezone);
+    StripeStreamsImpl(
+        const RowReaderImpl& reader, uint64_t index, const 
proto::StripeInformation& stripeInfo,
+        const proto::StripeFooter& footer, uint64_t stripeStart, InputStream& 
input,
+        const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams,
+        const Timezone& writerTimezone, const Timezone& readerTimezone);
 
     virtual ~StripeStreamsImpl() override;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to