This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ae46e3  ARROW-9321: [C++][Dataset] Populate statistics 
opportunistically
3ae46e3 is described below

commit 3ae46e33aa94c8f357abb8c6debe361b53d7907d
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Sun Jul 12 17:53:16 2020 -0500

    ARROW-9321: [C++][Dataset] Populate statistics opportunistically
    
    Populate ParquetFileFragment statistics whenever a reader is opened anyway. 
Also provides an explicit method for forcing load of statistics. (I exposed 
this as a public method, but maybe we'd prefer to hide it inside the 
`statistics` property the way we do physical schema?)
    
    Closes #7692 from bkietz/9321-populate-statistics-on-read
    
    Lead-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Wes McKinney <[email protected]>
---
 cpp/src/arrow/dataset/dataset.cc             |  12 +-
 cpp/src/arrow/dataset/file_parquet.cc        | 230 ++++++++++++++-------------
 cpp/src/arrow/dataset/file_parquet.h         |  24 +--
 python/pyarrow/_dataset.pyx                  |  13 +-
 python/pyarrow/includes/libarrow_dataset.pxd |   1 +
 python/pyarrow/tests/test_dataset.py         |  54 ++++++-
 6 files changed, 207 insertions(+), 127 deletions(-)

diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc
index ed936db..71755aa 100644
--- a/cpp/src/arrow/dataset/dataset.cc
+++ b/cpp/src/arrow/dataset/dataset.cc
@@ -40,9 +40,17 @@ Fragment::Fragment(std::shared_ptr<Expression> 
partition_expression,
 }
 
 Result<std::shared_ptr<Schema>> Fragment::ReadPhysicalSchema() {
+  {
+    auto lock = physical_schema_mutex_.Lock();
+    if (physical_schema_ != nullptr) return physical_schema_;
+  }
+
+  // allow ReadPhysicalSchemaImpl to lock mutex_, if necessary
+  ARROW_ASSIGN_OR_RAISE(auto physical_schema, ReadPhysicalSchemaImpl());
+
   auto lock = physical_schema_mutex_.Lock();
-  if (physical_schema_ == NULLPTR) {
-    ARROW_ASSIGN_OR_RAISE(physical_schema_, ReadPhysicalSchemaImpl());
+  if (physical_schema_ == nullptr) {
+    physical_schema_ = std::move(physical_schema);
   }
   return physical_schema_;
 }
diff --git a/cpp/src/arrow/dataset/file_parquet.cc 
b/cpp/src/arrow/dataset/file_parquet.cc
index d5e05ed..4581faa 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -286,10 +286,9 @@ ParquetFileFormat::ParquetFileFormat(const 
parquet::ReaderProperties& reader_pro
 Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {
   try {
     ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
-    auto properties = MakeReaderProperties(*this);
     auto reader =
-        parquet::ParquetFileReader::Open(std::move(input), 
std::move(properties));
-    auto metadata = reader->metadata();
+        parquet::ParquetFileReader::Open(std::move(input), 
MakeReaderProperties(*this));
+    std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
     return metadata != nullptr && metadata->can_decompress();
   } catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) {
     ARROW_UNUSED(e);
@@ -316,7 +315,7 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
   auto properties = MakeReaderProperties(*this, pool);
   ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, 
std::move(properties)));
 
-  auto metadata = reader->metadata();
+  std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
   auto arrow_properties = MakeArrowReaderProperties(*this, *metadata);
 
   if (options) {
@@ -335,91 +334,41 @@ static inline bool RowGroupInfosAreComplete(const 
std::vector<RowGroupInfo>& inf
                      [](const RowGroupInfo& i) { return i.HasStatistics(); });
 }
 
-static inline std::vector<RowGroupInfo> FilterRowGroups(
-    std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
-  auto filter = [&predicate](const RowGroupInfo& info) {
-    return !info.Satisfy(predicate);
-  };
-  auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
-  row_groups.erase(end, row_groups.end());
-  return row_groups;
-}
-
-static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
-    std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
-  auto metadata = reader->parquet_reader()->metadata();
-  auto manifest = reader->manifest();
-  auto num_row_groups = metadata->num_row_groups();
-
-  if (row_groups.empty()) {
-    row_groups = RowGroupInfo::FromCount(num_row_groups);
-  }
-
-  // Augment a RowGroup with statistics if missing.
-  auto augment = [&](RowGroupInfo& info) {
-    if (!info.HasStatistics() && info.id() < num_row_groups) {
-      auto row_group = metadata->RowGroup(info.id());
-      info.set_num_rows(row_group->num_rows());
-      info.set_total_byte_size(row_group->total_byte_size());
-      info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group, 
manifest));
-    }
-  };
-  std::for_each(row_groups.begin(), row_groups.end(), augment);
-
-  return row_groups;
-}
-
 Result<ScanTaskIterator> 
ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
                                                      
std::shared_ptr<ScanContext> context,
                                                      FileFragment* fragment) 
const {
-  const auto& source = fragment->source();
-  auto row_groups = checked_cast<const 
ParquetFileFragment*>(fragment)->row_groups();
-
-  bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
-  // The following block is required to avoid any IO if all RowGroups are
-  // excluded due to prior statistics knowledge.
-  if (row_groups_are_complete) {
-    // physical_schema should be cached at this point
-    ARROW_ASSIGN_OR_RAISE(auto physical_schema, 
fragment->ReadPhysicalSchema());
-    RETURN_NOT_OK(options->filter->Validate(*physical_schema));
-
-    // Apply a pre-filtering if the user requested an explicit sub-set of
-    // row-groups. In the case where a RowGroup doesn't have statistics
-    // metdata, it will not be excluded.
-    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+  auto* parquet_fragment = checked_cast<ParquetFileFragment*>(fragment);
+  std::vector<RowGroupInfo> row_groups;
+
+  // If RowGroup metadata is cached completely we can pre-filter RowGroups 
before opening
+  // a FileReader, potentially avoiding IO altogether if all RowGroups are 
excluded due to
+  // prior statistics knowledge. In the case where a RowGroup doesn't have 
statistics
+  // metdata, it will not be excluded.
+  if (parquet_fragment->HasCompleteMetadata()) {
+    ARROW_ASSIGN_OR_RAISE(row_groups,
+                          parquet_fragment->FilterRowGroups(*options->filter));
     if (row_groups.empty()) {
       return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
     }
   }
 
   // Open the reader and pay the real IO cost.
-  ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(), 
context.get()));
-
-  // Ensure RowGroups are indexing valid RowGroups before augmenting.
-  auto num_row_groups = reader->num_row_groups();
-  for (const auto& row_group : row_groups) {
-    if (row_group.id() >= num_row_groups) {
-      return Status::IndexError("Trying to scan row group ", row_group.id(), " 
but ",
-                                source.path(), " only has ", num_row_groups,
-                                " row groups");
-    }
-  }
+  ARROW_ASSIGN_OR_RAISE(auto reader,
+                        GetReader(fragment->source(), options.get(), 
context.get()));
 
-  if (!row_groups_are_complete) {
+  if (!parquet_fragment->HasCompleteMetadata()) {
+    // row groups were not already filtered; do this now
+    RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
     ARROW_ASSIGN_OR_RAISE(row_groups,
-                          AugmentRowGroups(std::move(row_groups), 
reader.get()));
-    std::shared_ptr<Schema> physical_schema;
-    RETURN_NOT_OK(reader->GetSchema(&physical_schema));
-    RETURN_NOT_OK(options->filter->Validate(*physical_schema));
-    row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
-  }
-
-  if (row_groups.empty()) {
-    return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+                          parquet_fragment->FilterRowGroups(*options->filter));
+    if (row_groups.empty()) {
+      return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+    }
   }
 
-  return ParquetScanTaskIterator::Make(std::move(options), std::move(context), 
source,
-                                       std::move(reader), 
std::move(row_groups));
+  return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
+                                       fragment->source(), std::move(reader),
+                                       std::move(row_groups));
 }
 
 Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
@@ -508,36 +457,98 @@ ParquetFileFragment::ParquetFileFragment(FileSource 
source,
                    std::move(physical_schema)),
       row_groups_(std::move(row_groups)),
       parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
-      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+      has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+                             physical_schema_ != nullptr) {}
 
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
-    const std::shared_ptr<Expression>& predicate) {
-  auto simplified_predicate = predicate->Assume(partition_expression());
-  if (!simplified_predicate->IsSatisfiable()) {
-    return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* 
reader) {
+  if (HasCompleteMetadata()) {
+    return Status::OK();
   }
 
-  std::vector<RowGroupInfo> row_groups;
-  if (HasCompleteMetadata()) {
-    row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
-  } else {
+  if (reader == nullptr) {
     ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
-    ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_, 
reader.get()));
-    row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+    return EnsureCompleteMetadata(reader.get());
   }
 
-  FragmentVector fragments;
-  fragments.reserve(row_groups.size());
+  auto lock = physical_schema_mutex_.Lock();
+  if (HasCompleteMetadata()) {
+    return Status::OK();
+  }
+
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(reader->GetSchema(&schema));
+  if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+    return Status::Invalid("Fragment initialized with physical schema ",
+                           *physical_schema_, " but ", source_.path(), " has 
schema ",
+                           *schema);
+  }
+  physical_schema_ = std::move(schema);
+
+  std::shared_ptr<parquet::FileMetaData> metadata = 
reader->parquet_reader()->metadata();
+  int num_row_groups = metadata->num_row_groups();
+
+  if (row_groups_.empty()) {
+    row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+  }
+
+  for (const RowGroupInfo& info : row_groups_) {
+    // Ensure RowGroups are indexing valid RowGroups before augmenting.
+    if (info.id() >= num_row_groups) {
+      return Status::IndexError("Trying to scan row group ", info.id(), " but 
",
+                                source_.path(), " only has ", num_row_groups,
+                                " row groups");
+    }
+  }
+
+  for (RowGroupInfo& info : row_groups_) {
+    // Augment a RowGroup with statistics if missing.
+    if (info.HasStatistics()) continue;
+
+    auto row_group = metadata->RowGroup(info.id());
+    auto statistics = RowGroupStatisticsAsStructScalar(*row_group, 
reader->manifest());
+    info = RowGroupInfo(info.id(), row_group->num_rows(), 
row_group->total_byte_size(),
+                        std::move(statistics));
+  }
+
+  has_complete_metadata_ = true;
+  return Status::OK();
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+    const std::shared_ptr<Expression>& predicate) {
+  RETURN_NOT_OK(EnsureCompleteMetadata());
+  ARROW_ASSIGN_OR_RAISE(auto row_groups, FilterRowGroups(*predicate));
+
+  FragmentVector fragments(row_groups.size());
+  auto fragment = fragments.begin();
   for (auto&& row_group : row_groups) {
-    ARROW_ASSIGN_OR_RAISE(auto fragment,
+    ARROW_ASSIGN_OR_RAISE(*fragment++,
                           parquet_format_.MakeFragment(source_, 
partition_expression(),
                                                        
{std::move(row_group)}));
-    fragments.push_back(std::move(fragment));
   }
 
   return fragments;
 }
 
+Result<std::vector<RowGroupInfo>> ParquetFileFragment::FilterRowGroups(
+    const Expression& predicate) {
+  DCHECK(has_complete_metadata_);
+  RETURN_NOT_OK(predicate.Validate(*physical_schema_));
+
+  auto simplified_predicate = predicate.Assume(partition_expression_);
+  if (!simplified_predicate->IsSatisfiable()) {
+    return std::vector<RowGroupInfo>{};
+  }
+
+  auto row_groups = row_groups_;
+  auto end = std::remove_if(row_groups.begin(), row_groups.end(),
+                            [&simplified_predicate](const RowGroupInfo& info) {
+                              return !info.Satisfy(*simplified_predicate);
+                            });
+  row_groups.erase(end, row_groups.end());
+  return row_groups;
+}
+
 ///
 /// ParquetDatasetFactory
 ///
@@ -577,7 +588,7 @@ Result<std::shared_ptr<DatasetFactory>> 
ParquetDatasetFactory::Make(
   }
 
   ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
-  auto metadata = reader->parquet_reader()->metadata();
+  std::shared_ptr<parquet::FileMetaData> metadata = 
reader->parquet_reader()->metadata();
 
   return std::shared_ptr<DatasetFactory>(
       new ParquetDatasetFactory(std::move(filesystem), std::move(format),
@@ -633,7 +644,7 @@ Result<std::vector<std::string>> 
ParquetDatasetFactory::CollectPaths(
     ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
 
     for (int i = 0; i < metadata.num_row_groups(); i++) {
-      auto row_group = metadata.RowGroup(i);
+      std::shared_ptr<parquet::RowGroupMetaData> row_group = 
metadata.RowGroup(i);
       ARROW_ASSIGN_OR_RAISE(auto path,
                             FileFromRowGroup(filesystem_.get(), base_path_, 
*row_group));
       unique_paths.emplace(std::move(path));
@@ -673,25 +684,22 @@ ParquetDatasetFactory::CollectParquetFragments(
     ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, 
properties));
 
     for (int i = 0; i < metadata.num_row_groups(); i++) {
-      auto row_group = metadata.RowGroup(i);
+      std::shared_ptr<parquet::RowGroupMetaData> row_group = 
metadata.RowGroup(i);
       ARROW_ASSIGN_OR_RAISE(auto path,
                             FileFromRowGroup(filesystem_.get(), base_path_, 
*row_group));
-      auto stats = RowGroupStatisticsAsStructScalar(*row_group, manifest);
-      auto num_rows = row_group->num_rows();
-      auto total_byte_size = row_group->total_byte_size();
-
-      // Insert the path, or increase the count of row groups. It will be
-      // assumed that the RowGroup of a file are ordered exactly like in
-      // the metadata file.
-      auto elem_and_inserted =
-          path_to_row_group_infos.insert({path, {{0, num_rows, 
total_byte_size, stats}}});
-      if (!elem_and_inserted.second) {
-        auto& path_and_count = *elem_and_inserted.first;
-        auto& row_groups = path_and_count.second;
-        auto row_group_id = static_cast<int>(row_groups.size());
-        path_and_count.second.emplace_back(row_group_id, num_rows, 
total_byte_size,
-                                           stats);
-      }
+      std::shared_ptr<StructScalar> stats =
+          RowGroupStatisticsAsStructScalar(*row_group, manifest);
+
+      int64_t num_rows = row_group->num_rows();
+      int64_t total_byte_size = row_group->total_byte_size();
+
+      // Insert the path, or increase the count of row groups. It will be 
assumed that the
+      // RowGroup of a file are ordered exactly as in the metadata file.
+      auto path_and_row_groups =
+          path_to_row_group_infos.emplace(path, 
std::vector<RowGroupInfo>{}).first;
+      auto row_group_id = static_cast<int>(path_and_row_groups->second.size());
+      path_and_row_groups->second.emplace_back(row_group_id, num_rows, 
total_byte_size,
+                                               stats);
     }
 
     ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(metadata, 
properties));
diff --git a/cpp/src/arrow/dataset/file_parquet.h 
b/cpp/src/arrow/dataset/file_parquet.h
index 5888eb1..c156bfb 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -146,24 +146,16 @@ class ARROW_DS_EXPORT RowGroupInfo : public 
util::EqualityComparable<RowGroupInf
   ///
   /// If statistics are not provided, return -1.
   int64_t num_rows() const { return num_rows_; }
-  void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
 
   /// \brief Return the RowGroup's total size in bytes.
   ///
   /// If statistics are not provided, return -1.
   int64_t total_byte_size() const { return total_byte_size_; }
-  void set_total_byte_size(int64_t total_byte_size) {
-    total_byte_size_ = total_byte_size;
-  }
 
   /// \brief Return the RowGroup's statistics as a StructScalar with a field 
for
   /// each column with statistics.
   /// Each field will also be a StructScalar with "min" and "max" fields.
   const std::shared_ptr<StructScalar>& statistics() const { return 
statistics_; }
-  void set_statistics(std::shared_ptr<StructScalar> statistics) {
-    statistics_ = std::move(statistics);
-    SetStatisticsExpression();
-  }
 
   /// \brief Indicate if statistics are set.
   bool HasStatistics() const { return statistics_ != NULLPTR; }
@@ -207,20 +199,30 @@ class ARROW_DS_EXPORT ParquetFileFragment : public 
FileFragment {
   /// represents all RowGroups in the parquet file.
   const std::vector<RowGroupInfo>& row_groups() const { return row_groups_; }
 
-  /// \brief Indicate if the attached statistics are complete.
+  /// \brief Indicate if the attached statistics are complete and the physical 
schema
+  /// is cached.
   ///
   /// The statistics are complete if the provided RowGroups (see 
`row_groups()`)
   /// is not empty / and all RowGroup return true on 
`RowGroup::HasStatistics()`.
   bool HasCompleteMetadata() const { return has_complete_metadata_; }
 
+  /// \brief Ensure attached statistics are complete and the physical schema 
is cached.
+  Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);
+
  private:
   ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
                       std::shared_ptr<Expression> partition_expression,
                       std::shared_ptr<Schema> physical_schema,
                       std::vector<RowGroupInfo> row_groups);
 
-  // TODO(bkietz) override ReadPhysicalSchemaImpl to augment row_groups_
-  // while a reader is opened anyway
+  // Overridden to opportunistically set metadata since a reader must be 
opened anyway.
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
+    return physical_schema_;
+  }
+
+  // Return a filtered subset of RowGroupInfos.
+  Result<std::vector<RowGroupInfo>> FilterRowGroups(const Expression& 
predicate);
 
   std::vector<RowGroupInfo> row_groups_;
   ParquetFileFormat& parquet_format_;
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 42a5302..111b797 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -913,13 +913,24 @@ cdef class ParquetFileFragment(FileFragment):
 
     def __reduce__(self):
         buffer = self.buffer
+        if self.row_groups is not None:
+            row_groups = [row_group.id for row_group in self.row_groups]
+        else:
+            row_groups = None
         return self.format.make_fragment, (
             self.path if buffer is None else buffer,
             self.filesystem,
             self.partition_expression,
-            self.row_groups
+            row_groups
         )
 
+    def ensure_complete_metadata(self):
+        """
+        Ensure that all metadata (statistics, physical schema, ...) have
+        been read and cached in this fragment.
+        """
+        check_status(self.parquet_file_fragment.EnsureCompleteMetadata())
+
     @property
     def row_groups(self):
         cdef:
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd 
b/python/pyarrow/includes/libarrow_dataset.pxd
index 3462b32..ab18300 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -236,6 +236,7 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
         const vector[CRowGroupInfo]& row_groups() const
         CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup(
             shared_ptr[CExpression] predicate)
+        CStatus EnsureCompleteMetadata()
 
     cdef cppclass CFileSystemDataset \
             "arrow::dataset::FileSystemDataset"(CDataset):
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index a16b6dc..1c348f4 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -677,7 +677,7 @@ def test_make_parquet_fragment_from_buffer():
         assert pickled.to_table().equals(table)
 
 
-def _create_dataset_for_fragments(tempdir, chunk_size=None):
+def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None):
     import pyarrow.parquet as pq
 
     table = pa.table(
@@ -690,8 +690,11 @@ def _create_dataset_for_fragments(tempdir, 
chunk_size=None):
     # write_to_dataset currently requires pandas
     pq.write_to_dataset(table, path,
                         partition_cols=["part"], chunk_size=chunk_size)
+    dataset = ds.dataset(
+        path, format="parquet", partitioning="hive", filesystem=filesystem
+    )
 
-    return table, ds.dataset(path, format="parquet", partitioning="hive")
+    return table, dataset
 
 
 @pytest.mark.pandas
@@ -829,6 +832,53 @@ def test_fragments_parquet_row_groups(tempdir):
     assert len(result) == 1
 
 
[email protected]
[email protected]
+def test_fragments_parquet_ensure_metadata(tempdir, open_logging_fs):
+    fs, assert_opens = open_logging_fs
+    _, dataset = _create_dataset_for_fragments(
+        tempdir, chunk_size=2, filesystem=fs
+    )
+    fragment = list(dataset.get_fragments())[0]
+
+    # with default discovery, no metadata loaded
+    assert fragment.row_groups is None
+    with assert_opens([fragment.path]):
+        fragment.ensure_complete_metadata()
+    assert fragment.row_groups is not None
+
+    # second time -> use cached / no file IO
+    with assert_opens([]):
+        fragment.ensure_complete_metadata()
+
+    # recreate fragment with row group ids
+    new_fragment = fragment.format.make_fragment(
+        fragment.path, fragment.filesystem, row_groups=[0, 1]
+    )
+    assert new_fragment.row_groups is not None
+    assert len(new_fragment.row_groups) == 2
+    row_group = new_fragment.row_groups[0]
+    assert row_group.id == 0
+    # no initialized statistics
+    assert row_group.num_rows == -1
+    assert row_group.statistics is None
+
+    # collect metadata
+    new_fragment.ensure_complete_metadata()
+    row_group = new_fragment.row_groups[0]
+    assert row_group.id == 0
+    assert row_group.num_rows == 2
+    assert row_group.statistics is not None
+
+    # pickling preserves row group ids but not statistics
+    pickled_fragment = pickle.loads(pickle.dumps(new_fragment))
+    assert pickled_fragment.row_groups is not None
+    row_group = pickled_fragment.row_groups[0]
+    assert row_group.id == 0
+    assert row_group.num_rows == -1
+    assert row_group.statistics is None
+
+
 def _create_dataset_all_types(tempdir, chunk_size=None):
     import pyarrow.parquet as pq
 

Reply via email to