This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch orc in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/orc by this push: new f10bb3d9113 [Feature] Add input stream of stripe streams in stripe reader. (#270) f10bb3d9113 is described below commit f10bb3d91136149845f69fa51ad3a0663941c43d Author: Qi Chen <che...@selectdb.com> AuthorDate: Fri Jan 10 09:00:56 2025 +0800 [Feature] Add input stream of stripe streams in stripe reader. (#270) --- c++/include/orc/Common.hh | 49 +++++++++++++++++++++++++++++++++++++++ c++/include/orc/OrcFile.hh | 6 +++-- c++/src/Reader.cc | 58 ++++++++++++++++++++++++++-------------------- c++/src/Reader.hh | 12 +++++----- c++/src/StripeStream.cc | 17 ++++++++------ c++/src/StripeStream.hh | 10 ++++---- 6 files changed, 108 insertions(+), 44 deletions(-) diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh index beae9dd6f31..9df0b0ed6a8 100644 --- a/c++/include/orc/Common.hh +++ b/c++/include/orc/Common.hh @@ -156,6 +156,46 @@ namespace orc { std::string columnEncodingKindToString(ColumnEncodingKind kind); + class StreamId { + public: + StreamId(uint64_t columnId, StreamKind streamKind) + : _columnId(columnId), _streamKind(streamKind) {} + + size_t hash() const { + size_t h1 = std::hash<uint64_t>{}(_columnId); + size_t h2 = std::hash<int>{}(static_cast<int>(_streamKind)); + return h1 ^ (h2 << 1); + } + + bool operator==(const StreamId& other) const { + return _columnId == other._columnId && _streamKind == other._streamKind; + } + + bool operator<(const StreamId& other) const { + if (_columnId != other._columnId) { + return _columnId < other._columnId; + } + return static_cast<int>(_streamKind) < static_cast<int>(other._streamKind); + } + + std::string toString() const { + std::ostringstream oss; + oss << "[columnId=" << _columnId << ", streamKind=" << static_cast<int>(_streamKind) << "]"; + return oss.str(); + } + + uint64_t columnId() const { + return _columnId; + } + StreamKind streamKind() const { + return _streamKind; + } + + private: + uint64_t _columnId; + StreamKind _streamKind; + }; + class StripeInformation { public: virtual ~StripeInformation(); @@ -306,4 +346,13 @@ namespace orc { } // namespace orc +namespace std { + template <> + struct hash<orc::StreamId> { + size_t operator()(const orc::StreamId& id) const { + return id.hash(); + } + }; +} // namespace std + #endif diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh index c52b66b7210..c7826e13add 100644 --- a/c++/include/orc/OrcFile.hh +++ b/c++/include/orc/OrcFile.hh @@ -63,8 +63,10 @@ namespace orc { */ virtual const std::string& getName() const = 0; - virtual void beforeReadStripe(std::unique_ptr<StripeInformation> currentStripeInformation, - std::vector<bool> selectedColumns); + virtual void beforeReadStripe( + std::unique_ptr<StripeInformation> currentStripeInformation, + std::vector<bool> selectedColumns, + std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams); }; /** diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index b8e1a914215..564c1f2aa6d 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -549,11 +549,14 @@ namespace orc { if (selectedColumns[colId] && pbStream.has_kind() && (pbStream.kind() == proto::Stream_Kind_ROW_INDEX || pbStream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8)) { - std::unique_ptr<SeekableInputStream> inStream = createDecompressor( - getCompression(), - std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream( - contents->stream.get(), offset, pbStream.length(), *contents->pool)), - getCompressionSize(), *contents->pool, contents->readerMetrics); + auto iter = streams.find({colId, static_cast<StreamKind>(pbStream.kind())}); + InputStream* inputStream = + (iter != streams.end()) ? iter->second.get() : contents->stream.get(); + std::unique_ptr<SeekableInputStream> inStream = + createDecompressor(getCompression(), + std::unique_ptr<SeekableInputStream>(new SeekableFileInputStream( + inputStream, offset, pbStream.length(), *contents->pool)), + getCompressionSize(), *contents->pool, contents->readerMetrics); if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) { proto::RowIndex rowIndex; @@ -951,7 +954,7 @@ namespace orc { readMetadata(); } - std::vector<int> allStripesNeeded(numberOfStripes,1); + std::vector<int> allStripesNeeded(numberOfStripes, 1); if (opts.getSearchArgument() && footer->rowindexstride() > 0) { auto sargs = opts.getSearchArgument(); @@ -963,19 +966,20 @@ namespace orc { return allStripesNeeded; } - for ( uint64_t currentStripeIndex = 0;currentStripeIndex < numberOfStripes ; currentStripeIndex ++) { + for (uint64_t currentStripeIndex = 0; currentStripeIndex < numberOfStripes; + currentStripeIndex++) { const auto& currentStripeStats = - contents->metadata->stripestats(static_cast<int>(currentStripeIndex)); - //Not need add mMetrics,so use 0. - allStripesNeeded[currentStripeIndex] = sargsApplier->evaluateStripeStatistics(currentStripeStats, 0);; + contents->metadata->stripestats(static_cast<int>(currentStripeIndex)); + // Not need add mMetrics,so use 0. + allStripesNeeded[currentStripeIndex] = + sargsApplier->evaluateStripeStatistics(currentStripeStats, 0); + ; } contents->sargsApplier = std::move(sargsApplier); } return allStripesNeeded; } - - uint64_t maxStreamsForType(const proto::Type& type) { switch (static_cast<int64_t>(type.kind())) { case proto::Type_Kind_STRUCT: @@ -1161,6 +1165,7 @@ namespace orc { reader.reset(); // ColumnReaders use lots of memory; free old memory first rowIndexes.clear(); bloomFilterIndex.clear(); + streams.clear(); followRowInStripe = 0; // evaluate file statistics if it exists @@ -1189,13 +1194,13 @@ namespace orc { if (sargsApplier) { bool isStripeNeeded = true; if (contents->metadata) { - const auto ¤tStripeStats = - contents->metadata->stripestats(static_cast<int>(currentStripe)); + const auto& currentStripeStats = + contents->metadata->stripestats(static_cast<int>(currentStripe)); // skip this stripe after stats fail to satisfy sargs uint64_t stripeRowGroupCount = - (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); + (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride(); isStripeNeeded = - sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); + sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount); } if (!isStripeNeeded) { // advance to next stripe when current stripe has no matching rows @@ -1209,11 +1214,12 @@ namespace orc { processingStripe = currentStripe; std::unique_ptr<StripeInformation> currentStripeInformation(new StripeInformationImpl( - currentStripeInfo.offset(), currentStripeInfo.indexlength(), - currentStripeInfo.datalength(), currentStripeInfo.footerlength(), - currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, - contents->compression, contents->blockSize, contents->readerMetrics)); - contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns); + currentStripeInfo.offset(), currentStripeInfo.indexlength(), + currentStripeInfo.datalength(), currentStripeInfo.footerlength(), + currentStripeInfo.numberofrows(), contents->stream.get(), *contents->pool, + contents->compression, contents->blockSize, contents->readerMetrics)); + contents->stream->beforeReadStripe(std::move(currentStripeInformation), selectedColumns, + streams); if (sargsApplier) { bool isStripeNeeded = true; @@ -1237,8 +1243,8 @@ namespace orc { ? getTimezoneByName(currentStripeFooter.writertimezone()) : localTimezone; StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter, - currentStripeInfo.offset(), *contents->stream, writerTimezone, - readerTimezone); + currentStripeInfo.offset(), *contents->stream, streams, + writerTimezone, readerTimezone); reader = buildReader(*contents->schema, stripeStreams, useTightNumericVector); if (stringDictFilter != nullptr) { @@ -1760,7 +1766,9 @@ namespace orc { // PASS }; - void InputStream::beforeReadStripe(std::unique_ptr<StripeInformation> currentStripeInformation, - std::vector<bool> selectedColumns) {} + void InputStream::beforeReadStripe( + std::unique_ptr<StripeInformation> currentStripeInformation, + std::vector<bool> selectedColumns, + std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {} } // namespace orc diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index 9505022c558..7ec049ad963 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -162,6 +162,7 @@ namespace orc { // contents std::shared_ptr<FileContents> contents; + std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>> streams; const bool throwOnHive11DecimalOverflow; const int32_t forcedScaleOnHive11Decimal; @@ -322,10 +323,9 @@ namespace orc { // internal methods void readMetadata() const; void checkOrcVersion(); - void getRowIndexStatistics( - const proto::StripeInformation& stripeInfo, uint64_t stripeIndex, - const proto::StripeFooter& currentStripeFooter, - std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const; + void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex, + const proto::StripeFooter& currentStripeFooter, + std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const; // metadata mutable bool isMetadataLoaded; @@ -425,8 +425,8 @@ namespace orc { return contents->stream.get(); } - void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override{ - contents->stream = std::move(inputStreamUPtr); + void setStream(std::unique_ptr<InputStream> inputStreamUPtr) override { + contents->stream = std::move(inputStreamUPtr); } uint64_t getMemoryUse(int stripeIx = -1) override; diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc index 1f43da4f243..8efa23efa86 100644 --- a/c++/src/StripeStream.cc +++ b/c++/src/StripeStream.cc @@ -25,17 +25,18 @@ namespace orc { - StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t _index, - const proto::StripeInformation& _stripeInfo, - const proto::StripeFooter& _footer, uint64_t _stripeStart, - InputStream& _input, const Timezone& _writerTimezone, - const Timezone& _readerTimezone) + StripeStreamsImpl::StripeStreamsImpl( + const RowReaderImpl& _reader, uint64_t _index, const proto::StripeInformation& _stripeInfo, + const proto::StripeFooter& _footer, uint64_t _stripeStart, InputStream& _input, + const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& _streams, + const Timezone& _writerTimezone, const Timezone& _readerTimezone) : reader(_reader), stripeInfo(_stripeInfo), footer(_footer), stripeIndex(_index), stripeStart(_stripeStart), input(_input), + streams(_streams), writerTimezone(_writerTimezone), readerTimezone(_readerTimezone) { // PASS @@ -87,8 +88,10 @@ namespace orc { const proto::Stream& stream = footer.streams(i); if (stream.has_kind() && stream.kind() == kind && stream.column() == static_cast<uint64_t>(columnId)) { + auto iter = streams.find({columnId, static_cast<StreamKind>(kind)}); + InputStream* inputStream = (iter != streams.end()) ? iter->second.get() : &input; uint64_t streamLength = stream.length(); - uint64_t myBlock = shouldStream ? input.getNaturalReadSize() : streamLength; + uint64_t myBlock = shouldStream ? inputStream->getNaturalReadSize() : streamLength; if (offset + streamLength > dataEnd) { std::stringstream msg; msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex @@ -100,7 +103,7 @@ namespace orc { } return createDecompressor(reader.getCompression(), std::make_unique<SeekableFileInputStream>( - &input, offset, stream.length(), *pool, myBlock), + inputStream, offset, stream.length(), *pool, myBlock), reader.getCompressionSize(), *pool, reader.getFileContents().readerMetrics); } diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh index 74bebda6f25..57e51ef76f0 100644 --- a/c++/src/StripeStream.hh +++ b/c++/src/StripeStream.hh @@ -43,14 +43,16 @@ namespace orc { const uint64_t stripeIndex; const uint64_t stripeStart; InputStream& input; + const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams; const Timezone& writerTimezone; const Timezone& readerTimezone; public: - StripeStreamsImpl(const RowReaderImpl& reader, uint64_t index, - const proto::StripeInformation& stripeInfo, const proto::StripeFooter& footer, - uint64_t stripeStart, InputStream& input, const Timezone& writerTimezone, - const Timezone& readerTimezone); + StripeStreamsImpl( + const RowReaderImpl& reader, uint64_t index, const proto::StripeInformation& stripeInfo, + const proto::StripeFooter& footer, uint64_t stripeStart, InputStream& input, + const std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams, + const Timezone& writerTimezone, const Timezone& readerTimezone); virtual ~StripeStreamsImpl() override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org