This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3ae46e3 ARROW-9321: [C++][Dataset] Populate statistics
opportunistically
3ae46e3 is described below
commit 3ae46e33aa94c8f357abb8c6debe361b53d7907d
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Sun Jul 12 17:53:16 2020 -0500
ARROW-9321: [C++][Dataset] Populate statistics opportunistically
Populate ParquetFileFragment statistics whenever a reader is opened anyway.
Also provides an explicit method for forcing load of statistics. (I exposed
this as a public method, but maybe we'd prefer to hide it inside the
`statistics` property the way we do physical schema?)
Closes #7692 from bkietz/9321-populate-statistics-on-read
Lead-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
---
cpp/src/arrow/dataset/dataset.cc | 12 +-
cpp/src/arrow/dataset/file_parquet.cc | 230 ++++++++++++++-------------
cpp/src/arrow/dataset/file_parquet.h | 24 +--
python/pyarrow/_dataset.pyx | 13 +-
python/pyarrow/includes/libarrow_dataset.pxd | 1 +
python/pyarrow/tests/test_dataset.py | 54 ++++++-
6 files changed, 207 insertions(+), 127 deletions(-)
diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc
index ed936db..71755aa 100644
--- a/cpp/src/arrow/dataset/dataset.cc
+++ b/cpp/src/arrow/dataset/dataset.cc
@@ -40,9 +40,17 @@ Fragment::Fragment(std::shared_ptr<Expression>
partition_expression,
}
Result<std::shared_ptr<Schema>> Fragment::ReadPhysicalSchema() {
+ {
+ auto lock = physical_schema_mutex_.Lock();
+ if (physical_schema_ != nullptr) return physical_schema_;
+ }
+
+ // allow ReadPhysicalSchemaImpl to lock mutex_, if necessary
+ ARROW_ASSIGN_OR_RAISE(auto physical_schema, ReadPhysicalSchemaImpl());
+
auto lock = physical_schema_mutex_.Lock();
- if (physical_schema_ == NULLPTR) {
- ARROW_ASSIGN_OR_RAISE(physical_schema_, ReadPhysicalSchemaImpl());
+ if (physical_schema_ == nullptr) {
+ physical_schema_ = std::move(physical_schema);
}
return physical_schema_;
}
diff --git a/cpp/src/arrow/dataset/file_parquet.cc
b/cpp/src/arrow/dataset/file_parquet.cc
index d5e05ed..4581faa 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -286,10 +286,9 @@ ParquetFileFormat::ParquetFileFormat(const
parquet::ReaderProperties& reader_pro
Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {
try {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
- auto properties = MakeReaderProperties(*this);
auto reader =
- parquet::ParquetFileReader::Open(std::move(input),
std::move(properties));
- auto metadata = reader->metadata();
+ parquet::ParquetFileReader::Open(std::move(input),
MakeReaderProperties(*this));
+ std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
return metadata != nullptr && metadata->can_decompress();
} catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) {
ARROW_UNUSED(e);
@@ -316,7 +315,7 @@ Result<std::unique_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReader
auto properties = MakeReaderProperties(*this, pool);
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source,
std::move(properties)));
- auto metadata = reader->metadata();
+ std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties = MakeArrowReaderProperties(*this, *metadata);
if (options) {
@@ -335,91 +334,41 @@ static inline bool RowGroupInfosAreComplete(const
std::vector<RowGroupInfo>& inf
[](const RowGroupInfo& i) { return i.HasStatistics(); });
}
-static inline std::vector<RowGroupInfo> FilterRowGroups(
- std::vector<RowGroupInfo> row_groups, const Expression& predicate) {
- auto filter = [&predicate](const RowGroupInfo& info) {
- return !info.Satisfy(predicate);
- };
- auto end = std::remove_if(row_groups.begin(), row_groups.end(), filter);
- row_groups.erase(end, row_groups.end());
- return row_groups;
-}
-
-static inline Result<std::vector<RowGroupInfo>> AugmentRowGroups(
- std::vector<RowGroupInfo> row_groups, parquet::arrow::FileReader* reader) {
- auto metadata = reader->parquet_reader()->metadata();
- auto manifest = reader->manifest();
- auto num_row_groups = metadata->num_row_groups();
-
- if (row_groups.empty()) {
- row_groups = RowGroupInfo::FromCount(num_row_groups);
- }
-
- // Augment a RowGroup with statistics if missing.
- auto augment = [&](RowGroupInfo& info) {
- if (!info.HasStatistics() && info.id() < num_row_groups) {
- auto row_group = metadata->RowGroup(info.id());
- info.set_num_rows(row_group->num_rows());
- info.set_total_byte_size(row_group->total_byte_size());
- info.set_statistics(RowGroupStatisticsAsStructScalar(*row_group,
manifest));
- }
- };
- std::for_each(row_groups.begin(), row_groups.end(), augment);
-
- return row_groups;
-}
-
Result<ScanTaskIterator>
ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context,
FileFragment* fragment)
const {
- const auto& source = fragment->source();
- auto row_groups = checked_cast<const
ParquetFileFragment*>(fragment)->row_groups();
-
- bool row_groups_are_complete = RowGroupInfosAreComplete(row_groups);
- // The following block is required to avoid any IO if all RowGroups are
- // excluded due to prior statistics knowledge.
- if (row_groups_are_complete) {
- // physical_schema should be cached at this point
- ARROW_ASSIGN_OR_RAISE(auto physical_schema,
fragment->ReadPhysicalSchema());
- RETURN_NOT_OK(options->filter->Validate(*physical_schema));
-
- // Apply a pre-filtering if the user requested an explicit sub-set of
- // row-groups. In the case where a RowGroup doesn't have statistics
- // metdata, it will not be excluded.
- row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
+ auto* parquet_fragment = checked_cast<ParquetFileFragment*>(fragment);
+ std::vector<RowGroupInfo> row_groups;
+
+ // If RowGroup metadata is cached completely we can pre-filter RowGroups
before opening
+ // a FileReader, potentially avoiding IO altogether if all RowGroups are
excluded due to
+ // prior statistics knowledge. In the case where a RowGroup doesn't have
statistics
+ // metdata, it will not be excluded.
+ if (parquet_fragment->HasCompleteMetadata()) {
+ ARROW_ASSIGN_OR_RAISE(row_groups,
+ parquet_fragment->FilterRowGroups(*options->filter));
if (row_groups.empty()) {
return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
}
}
// Open the reader and pay the real IO cost.
- ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, options.get(),
context.get()));
-
- // Ensure RowGroups are indexing valid RowGroups before augmenting.
- auto num_row_groups = reader->num_row_groups();
- for (const auto& row_group : row_groups) {
- if (row_group.id() >= num_row_groups) {
- return Status::IndexError("Trying to scan row group ", row_group.id(), "
but ",
- source.path(), " only has ", num_row_groups,
- " row groups");
- }
- }
+ ARROW_ASSIGN_OR_RAISE(auto reader,
+ GetReader(fragment->source(), options.get(),
context.get()));
- if (!row_groups_are_complete) {
+ if (!parquet_fragment->HasCompleteMetadata()) {
+ // row groups were not already filtered; do this now
+ RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
ARROW_ASSIGN_OR_RAISE(row_groups,
- AugmentRowGroups(std::move(row_groups),
reader.get()));
- std::shared_ptr<Schema> physical_schema;
- RETURN_NOT_OK(reader->GetSchema(&physical_schema));
- RETURN_NOT_OK(options->filter->Validate(*physical_schema));
- row_groups = FilterRowGroups(std::move(row_groups), *options->filter);
- }
-
- if (row_groups.empty()) {
- return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+ parquet_fragment->FilterRowGroups(*options->filter));
+ if (row_groups.empty()) {
+ return MakeEmptyIterator<std::shared_ptr<ScanTask>>();
+ }
}
- return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
source,
- std::move(reader),
std::move(row_groups));
+ return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
+ fragment->source(), std::move(reader),
+ std::move(row_groups));
}
Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
@@ -508,36 +457,98 @@ ParquetFileFragment::ParquetFileFragment(FileSource
source,
std::move(physical_schema)),
row_groups_(std::move(row_groups)),
parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
- has_complete_metadata_(RowGroupInfosAreComplete(row_groups_)) {}
+ has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
+ physical_schema_ != nullptr) {}
-Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
- const std::shared_ptr<Expression>& predicate) {
- auto simplified_predicate = predicate->Assume(partition_expression());
- if (!simplified_predicate->IsSatisfiable()) {
- return FragmentVector{};
+Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader*
reader) {
+ if (HasCompleteMetadata()) {
+ return Status::OK();
}
- std::vector<RowGroupInfo> row_groups;
- if (HasCompleteMetadata()) {
- row_groups = FilterRowGroups(row_groups_, *simplified_predicate);
- } else {
+ if (reader == nullptr) {
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
- ARROW_ASSIGN_OR_RAISE(row_groups, AugmentRowGroups(row_groups_,
reader.get()));
- row_groups = FilterRowGroups(std::move(row_groups), *simplified_predicate);
+ return EnsureCompleteMetadata(reader.get());
}
- FragmentVector fragments;
- fragments.reserve(row_groups.size());
+ auto lock = physical_schema_mutex_.Lock();
+ if (HasCompleteMetadata()) {
+ return Status::OK();
+ }
+
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(reader->GetSchema(&schema));
+ if (physical_schema_ && !physical_schema_->Equals(*schema)) {
+ return Status::Invalid("Fragment initialized with physical schema ",
+ *physical_schema_, " but ", source_.path(), " has
schema ",
+ *schema);
+ }
+ physical_schema_ = std::move(schema);
+
+ std::shared_ptr<parquet::FileMetaData> metadata =
reader->parquet_reader()->metadata();
+ int num_row_groups = metadata->num_row_groups();
+
+ if (row_groups_.empty()) {
+ row_groups_ = RowGroupInfo::FromCount(num_row_groups);
+ }
+
+ for (const RowGroupInfo& info : row_groups_) {
+ // Ensure RowGroups are indexing valid RowGroups before augmenting.
+ if (info.id() >= num_row_groups) {
+ return Status::IndexError("Trying to scan row group ", info.id(), " but
",
+ source_.path(), " only has ", num_row_groups,
+ " row groups");
+ }
+ }
+
+ for (RowGroupInfo& info : row_groups_) {
+ // Augment a RowGroup with statistics if missing.
+ if (info.HasStatistics()) continue;
+
+ auto row_group = metadata->RowGroup(info.id());
+ auto statistics = RowGroupStatisticsAsStructScalar(*row_group,
reader->manifest());
+ info = RowGroupInfo(info.id(), row_group->num_rows(),
row_group->total_byte_size(),
+ std::move(statistics));
+ }
+
+ has_complete_metadata_ = true;
+ return Status::OK();
+}
+
+Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
+ const std::shared_ptr<Expression>& predicate) {
+ RETURN_NOT_OK(EnsureCompleteMetadata());
+ ARROW_ASSIGN_OR_RAISE(auto row_groups, FilterRowGroups(*predicate));
+
+ FragmentVector fragments(row_groups.size());
+ auto fragment = fragments.begin();
for (auto&& row_group : row_groups) {
- ARROW_ASSIGN_OR_RAISE(auto fragment,
+ ARROW_ASSIGN_OR_RAISE(*fragment++,
parquet_format_.MakeFragment(source_,
partition_expression(),
{std::move(row_group)}));
- fragments.push_back(std::move(fragment));
}
return fragments;
}
+Result<std::vector<RowGroupInfo>> ParquetFileFragment::FilterRowGroups(
+ const Expression& predicate) {
+ DCHECK(has_complete_metadata_);
+ RETURN_NOT_OK(predicate.Validate(*physical_schema_));
+
+ auto simplified_predicate = predicate.Assume(partition_expression_);
+ if (!simplified_predicate->IsSatisfiable()) {
+ return std::vector<RowGroupInfo>{};
+ }
+
+ auto row_groups = row_groups_;
+ auto end = std::remove_if(row_groups.begin(), row_groups.end(),
+ [&simplified_predicate](const RowGroupInfo& info) {
+ return !info.Satisfy(*simplified_predicate);
+ });
+ row_groups.erase(end, row_groups.end());
+ return row_groups;
+}
+
///
/// ParquetDatasetFactory
///
@@ -577,7 +588,7 @@ Result<std::shared_ptr<DatasetFactory>>
ParquetDatasetFactory::Make(
}
ARROW_ASSIGN_OR_RAISE(auto reader, format->GetReader(metadata_source));
- auto metadata = reader->parquet_reader()->metadata();
+ std::shared_ptr<parquet::FileMetaData> metadata =
reader->parquet_reader()->metadata();
return std::shared_ptr<DatasetFactory>(
new ParquetDatasetFactory(std::move(filesystem), std::move(format),
@@ -633,7 +644,7 @@ Result<std::vector<std::string>>
ParquetDatasetFactory::CollectPaths(
ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata,
properties));
for (int i = 0; i < metadata.num_row_groups(); i++) {
- auto row_group = metadata.RowGroup(i);
+ std::shared_ptr<parquet::RowGroupMetaData> row_group =
metadata.RowGroup(i);
ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem_.get(), base_path_,
*row_group));
unique_paths.emplace(std::move(path));
@@ -673,25 +684,22 @@ ParquetDatasetFactory::CollectParquetFragments(
ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata,
properties));
for (int i = 0; i < metadata.num_row_groups(); i++) {
- auto row_group = metadata.RowGroup(i);
+ std::shared_ptr<parquet::RowGroupMetaData> row_group =
metadata.RowGroup(i);
ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem_.get(), base_path_,
*row_group));
- auto stats = RowGroupStatisticsAsStructScalar(*row_group, manifest);
- auto num_rows = row_group->num_rows();
- auto total_byte_size = row_group->total_byte_size();
-
- // Insert the path, or increase the count of row groups. It will be
- // assumed that the RowGroup of a file are ordered exactly like in
- // the metadata file.
- auto elem_and_inserted =
- path_to_row_group_infos.insert({path, {{0, num_rows,
total_byte_size, stats}}});
- if (!elem_and_inserted.second) {
- auto& path_and_count = *elem_and_inserted.first;
- auto& row_groups = path_and_count.second;
- auto row_group_id = static_cast<int>(row_groups.size());
- path_and_count.second.emplace_back(row_group_id, num_rows,
total_byte_size,
- stats);
- }
+ std::shared_ptr<StructScalar> stats =
+ RowGroupStatisticsAsStructScalar(*row_group, manifest);
+
+ int64_t num_rows = row_group->num_rows();
+ int64_t total_byte_size = row_group->total_byte_size();
+
+ // Insert the path, or increase the count of row groups. It will be
assumed that the
+ // RowGroup of a file are ordered exactly as in the metadata file.
+ auto path_and_row_groups =
+ path_to_row_group_infos.emplace(path,
std::vector<RowGroupInfo>{}).first;
+ auto row_group_id = static_cast<int>(path_and_row_groups->second.size());
+ path_and_row_groups->second.emplace_back(row_group_id, num_rows,
total_byte_size,
+ stats);
}
ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(metadata,
properties));
diff --git a/cpp/src/arrow/dataset/file_parquet.h
b/cpp/src/arrow/dataset/file_parquet.h
index 5888eb1..c156bfb 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -146,24 +146,16 @@ class ARROW_DS_EXPORT RowGroupInfo : public
util::EqualityComparable<RowGroupInf
///
/// If statistics are not provided, return -1.
int64_t num_rows() const { return num_rows_; }
- void set_num_rows(int64_t num_rows) { num_rows_ = num_rows; }
/// \brief Return the RowGroup's total size in bytes.
///
/// If statistics are not provided, return -1.
int64_t total_byte_size() const { return total_byte_size_; }
- void set_total_byte_size(int64_t total_byte_size) {
- total_byte_size_ = total_byte_size;
- }
/// \brief Return the RowGroup's statistics as a StructScalar with a field
for
/// each column with statistics.
/// Each field will also be a StructScalar with "min" and "max" fields.
const std::shared_ptr<StructScalar>& statistics() const { return
statistics_; }
- void set_statistics(std::shared_ptr<StructScalar> statistics) {
- statistics_ = std::move(statistics);
- SetStatisticsExpression();
- }
/// \brief Indicate if statistics are set.
bool HasStatistics() const { return statistics_ != NULLPTR; }
@@ -207,20 +199,30 @@ class ARROW_DS_EXPORT ParquetFileFragment : public
FileFragment {
/// represents all RowGroups in the parquet file.
const std::vector<RowGroupInfo>& row_groups() const { return row_groups_; }
- /// \brief Indicate if the attached statistics are complete.
+ /// \brief Indicate if the attached statistics are complete and the physical
schema
+ /// is cached.
///
/// The statistics are complete if the provided RowGroups (see
`row_groups()`)
/// is not empty / and all RowGroup return true on
`RowGroup::HasStatistics()`.
bool HasCompleteMetadata() const { return has_complete_metadata_; }
+ /// \brief Ensure attached statistics are complete and the physical schema
is cached.
+ Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);
+
private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
std::shared_ptr<Expression> partition_expression,
std::shared_ptr<Schema> physical_schema,
std::vector<RowGroupInfo> row_groups);
- // TODO(bkietz) override ReadPhysicalSchemaImpl to augment row_groups_
- // while a reader is opened anyway
+ // Overridden to opportunistically set metadata since a reader must be
opened anyway.
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
+ return physical_schema_;
+ }
+
+ // Return a filtered subset of RowGroupInfos.
+ Result<std::vector<RowGroupInfo>> FilterRowGroups(const Expression&
predicate);
std::vector<RowGroupInfo> row_groups_;
ParquetFileFormat& parquet_format_;
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 42a5302..111b797 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -913,13 +913,24 @@ cdef class ParquetFileFragment(FileFragment):
def __reduce__(self):
buffer = self.buffer
+ if self.row_groups is not None:
+ row_groups = [row_group.id for row_group in self.row_groups]
+ else:
+ row_groups = None
return self.format.make_fragment, (
self.path if buffer is None else buffer,
self.filesystem,
self.partition_expression,
- self.row_groups
+ row_groups
)
+ def ensure_complete_metadata(self):
+ """
+ Ensure that all metadata (statistics, physical schema, ...) have
+ been read and cached in this fragment.
+ """
+ check_status(self.parquet_file_fragment.EnsureCompleteMetadata())
+
@property
def row_groups(self):
cdef:
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd
b/python/pyarrow/includes/libarrow_dataset.pxd
index 3462b32..ab18300 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -236,6 +236,7 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
const vector[CRowGroupInfo]& row_groups() const
CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup(
shared_ptr[CExpression] predicate)
+ CStatus EnsureCompleteMetadata()
cdef cppclass CFileSystemDataset \
"arrow::dataset::FileSystemDataset"(CDataset):
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index a16b6dc..1c348f4 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -677,7 +677,7 @@ def test_make_parquet_fragment_from_buffer():
assert pickled.to_table().equals(table)
-def _create_dataset_for_fragments(tempdir, chunk_size=None):
+def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None):
import pyarrow.parquet as pq
table = pa.table(
@@ -690,8 +690,11 @@ def _create_dataset_for_fragments(tempdir,
chunk_size=None):
# write_to_dataset currently requires pandas
pq.write_to_dataset(table, path,
partition_cols=["part"], chunk_size=chunk_size)
+ dataset = ds.dataset(
+ path, format="parquet", partitioning="hive", filesystem=filesystem
+ )
- return table, ds.dataset(path, format="parquet", partitioning="hive")
+ return table, dataset
@pytest.mark.pandas
@@ -829,6 +832,53 @@ def test_fragments_parquet_row_groups(tempdir):
assert len(result) == 1
[email protected]
[email protected]
+def test_fragments_parquet_ensure_metadata(tempdir, open_logging_fs):
+ fs, assert_opens = open_logging_fs
+ _, dataset = _create_dataset_for_fragments(
+ tempdir, chunk_size=2, filesystem=fs
+ )
+ fragment = list(dataset.get_fragments())[0]
+
+ # with default discovery, no metadata loaded
+ assert fragment.row_groups is None
+ with assert_opens([fragment.path]):
+ fragment.ensure_complete_metadata()
+ assert fragment.row_groups is not None
+
+ # second time -> use cached / no file IO
+ with assert_opens([]):
+ fragment.ensure_complete_metadata()
+
+ # recreate fragment with row group ids
+ new_fragment = fragment.format.make_fragment(
+ fragment.path, fragment.filesystem, row_groups=[0, 1]
+ )
+ assert new_fragment.row_groups is not None
+ assert len(new_fragment.row_groups) == 2
+ row_group = new_fragment.row_groups[0]
+ assert row_group.id == 0
+ # no initialized statistics
+ assert row_group.num_rows == -1
+ assert row_group.statistics is None
+
+ # collect metadata
+ new_fragment.ensure_complete_metadata()
+ row_group = new_fragment.row_groups[0]
+ assert row_group.id == 0
+ assert row_group.num_rows == 2
+ assert row_group.statistics is not None
+
+ # pickling preserves row group ids but not statistics
+ pickled_fragment = pickle.loads(pickle.dumps(new_fragment))
+ assert pickled_fragment.row_groups is not None
+ row_group = pickled_fragment.row_groups[0]
+ assert row_group.id == 0
+ assert row_group.num_rows == -1
+ assert row_group.statistics is None
+
+
def _create_dataset_all_types(tempdir, chunk_size=None):
import pyarrow.parquet as pq