This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 44aa829 ARROW-9288: [C++][Dataset] Fix PartitioningFactory with
dictionary encoding for HivePartioning
44aa829 is described below
commit 44aa8292605bf7484ae73b289055482e399e90d0
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Sun Jul 12 17:58:10 2020 -0500
ARROW-9288: [C++][Dataset] Fix PartitioningFactory with dictionary encoding
for HivePartioning
Closes #7608 from jorisvandenbossche/ARROW-9288
Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
---
cpp/src/arrow/dataset/partition.cc | 26 +++++++++++++++++++++++++-
python/pyarrow/tests/test_dataset.py | 29 +++++++++++++++++++++++++++++
2 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/dataset/partition.cc
b/cpp/src/arrow/dataset/partition.cc
index 744e9dd..2a2ecdf 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -317,6 +317,16 @@ class KeyValuePartitioningInspectImpl {
return ::arrow::schema(std::move(fields));
}
+ std::vector<std::string> FieldNames() {
+ std::vector<std::string> names;
+ names.reserve(name_to_index_.size());
+
+ for (auto kv : name_to_index_) {
+ names.push_back(kv.first);
+ }
+ return names;
+ }
+
private:
std::unordered_map<std::string, int> name_to_index_;
std::vector<std::set<std::string>> values_;
@@ -657,15 +667,29 @@ class HivePartitioningFactory : public
PartitioningFactory {
}
}
+ field_names_ = impl.FieldNames();
return impl.Finish(&dictionaries_);
}
Result<std::shared_ptr<Partitioning>> Finish(
const std::shared_ptr<Schema>& schema) const override {
- return std::shared_ptr<Partitioning>(new HivePartitioning(schema,
dictionaries_));
+ if (dictionaries_.empty()) {
+ return std::make_shared<HivePartitioning>(schema, dictionaries_);
+ } else {
+ for (FieldRef ref : field_names_) {
+ // ensure all of field_names_ are present in schema
+ RETURN_NOT_OK(ref.FindOne(*schema).status());
+ }
+
+ // drop fields which aren't in field_names_
+ auto out_schema = SchemaFromColumnNames(schema, field_names_);
+
+ return std::make_shared<HivePartitioning>(std::move(out_schema),
dictionaries_);
+ }
}
private:
+ std::vector<std::string> field_names_;
ArrayVector dictionaries_;
PartitioningFactoryOptions options_;
};
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index 1c348f4..428547c 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -1484,6 +1484,35 @@ def test_open_dataset_non_existing_file():
ds.dataset('file:i-am-not-existing.parquet', format='parquet')
[email protected]
[email protected]('partitioning', ["directory", "hive"])
+def test_open_dataset_partitioned_dictionary_type(tempdir, partitioning):
+ # ARROW-9288
+ import pyarrow.parquet as pq
+ table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5})
+
+ path = tempdir / "dataset"
+ path.mkdir()
+
+ for part in ["A", "B", "C"]:
+ fmt = "{}" if partitioning == "directory" else "part={}"
+ part = path / fmt.format(part)
+ part.mkdir()
+ pq.write_table(table, part / "test.parquet")
+
+ if partitioning == "directory":
+ part = ds.DirectoryPartitioning.discover(
+ ["part"], max_partition_dictionary_size=-1)
+ else:
+ part = ds.HivePartitioning.discover(max_partition_dictionary_size=-1)
+
+ dataset = ds.dataset(str(path), partitioning=part)
+ expected_schema = table.schema.append(
+ pa.field("part", pa.dictionary(pa.int32(), pa.string()))
+ )
+ assert dataset.schema.equals(expected_schema)
+
+
@pytest.fixture
def s3_example_simple(s3_connection, s3_server):
from pyarrow.fs import FileSystem