This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6e3daf0 ARROW-9068: [C++][Dataset] Simplify partitioning interface
6e3daf0 is described below
commit 6e3daf0defcd7ef9a375aed006b5b75bc230ff3e
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Wed Jul 29 14:59:41 2020 +0200
ARROW-9068: [C++][Dataset] Simplify partitioning interface
Format and Parse now operate on a whole path rather than on individual
segments.
This will simplify building WritePlans but I've only made minimal changes
there (to keep tests passing); follow up in
https://issues.apache.org/jira/browse/ARROW-8382
Closes #7820 from bkietz/9068-simplify-partitioning-interface
Authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Krisztián Szűcs <[email protected]>
---
cpp/src/arrow/dataset/discovery_test.cc | 13 --
cpp/src/arrow/dataset/partition.cc | 225 ++++++++++++++++-----------
cpp/src/arrow/dataset/partition.h | 125 ++++++---------
cpp/src/arrow/dataset/partition_test.cc | 231 ++++++++++++++++------------
cpp/src/arrow/filesystem/filesystem_test.cc | 2 +-
cpp/src/arrow/filesystem/path_util.h | 2 +
cpp/src/arrow/type.cc | 18 ++-
cpp/src/arrow/type.h | 7 +-
8 files changed, 331 insertions(+), 292 deletions(-)
diff --git a/cpp/src/arrow/dataset/discovery_test.cc
b/cpp/src/arrow/dataset/discovery_test.cc
index 2fc21de..a3f1379 100644
--- a/cpp/src/arrow/dataset/discovery_test.cc
+++ b/cpp/src/arrow/dataset/discovery_test.cc
@@ -80,19 +80,6 @@ class MockDatasetFactory : public DatasetFactory {
std::vector<std::shared_ptr<Schema>> schemas_;
};
-class MockPartitioning : public Partitioning {
- public:
- explicit MockPartitioning(std::shared_ptr<Schema> schema)
- : Partitioning(std::move(schema)) {}
-
- Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
- int i) const override {
- return nullptr;
- }
-
- std::string type_name() const override { return "mock_partitioning"; }
-};
-
class MockDatasetFactoryTest : public DatasetFactoryTest {
public:
void MakeFactory(std::vector<std::shared_ptr<Schema>> schemas) {
diff --git a/cpp/src/arrow/dataset/partition.cc
b/cpp/src/arrow/dataset/partition.cc
index 9f497a1..5844f89 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -47,23 +47,23 @@ using util::string_view;
using arrow::internal::checked_cast;
-Result<std::shared_ptr<Expression>> Partitioning::Parse(const std::string&
path) const {
- ExpressionVector expressions;
- int i = 0;
+std::shared_ptr<Partitioning> Partitioning::Default() {
+ class DefaultPartitioning : public Partitioning {
+ public:
+ DefaultPartitioning() : Partitioning(::arrow::schema({})) {}
- for (auto segment : fs::internal::SplitAbstractPath(path)) {
- ARROW_ASSIGN_OR_RAISE(auto expr, Parse(segment, i++));
- if (expr->Equals(true)) {
- continue;
- }
+ std::string type_name() const override { return "default"; }
- expressions.push_back(std::move(expr));
- }
+ Result<std::shared_ptr<Expression>> Parse(const std::string& path) const
override {
+ return scalar(true);
+ }
- return and_(std::move(expressions));
-}
+ Result<std::string> Format(const Expression& expr) const override {
+ return Status::NotImplemented("formatting paths from ", type_name(),
+ " Partitioning");
+ }
+ };
-std::shared_ptr<Partitioning> Partitioning::Default() {
return std::make_shared<DefaultPartitioning>();
}
@@ -80,18 +80,6 @@ Result<WritePlan> PartitioningFactory::MakeWritePlan(
type_name());
}
-Result<std::shared_ptr<Expression>> SegmentDictionaryPartitioning::Parse(
- const std::string& segment, int i) const {
- if (static_cast<size_t>(i) < dictionaries_.size()) {
- auto it = dictionaries_[i].find(segment);
- if (it != dictionaries_[i].end()) {
- return it->second;
- }
- }
-
- return scalar(true);
-}
-
Status KeyValuePartitioning::VisitKeys(
const Expression& expr,
const std::function<Status(const std::string& name,
@@ -141,7 +129,7 @@ Status KeyValuePartitioning::SetDefaultValuesFromKeys(const
Expression& expr,
expr, [projector](const std::string& name, const
std::shared_ptr<Scalar>& value) {
ARROW_ASSIGN_OR_RAISE(auto match,
FieldRef(name).FindOneOrNone(*projector->schema()));
- if (match.indices().empty()) {
+ if (!match) {
return Status::OK();
}
return projector->SetDefaultValue(match, value);
@@ -149,25 +137,25 @@ Status
KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
}
Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
- const Key& key, const Schema& schema, const ArrayVector& dictionaries) {
- ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(schema));
- if (match.indices().empty()) {
+ const Key& key) const {
+ ARROW_ASSIGN_OR_RAISE(auto match,
FieldRef(key.name).FindOneOrNone(*schema_));
+ if (!match) {
return scalar(true);
}
- auto field_index = match.indices()[0];
- auto field = schema.field(field_index);
+ auto field_index = match[0];
+ auto field = schema_->field(field_index);
std::shared_ptr<Scalar> converted;
if (field->type()->id() == Type::DICTIONARY) {
- if (dictionaries.empty() || dictionaries[field_index] == nullptr) {
+ if (dictionaries_.empty() || dictionaries_[field_index] == nullptr) {
return Status::Invalid("No dictionary provided for dictionary field ",
field->ToString());
}
DictionaryScalar::ValueType value;
- value.dictionary = dictionaries[field_index];
+ value.dictionary = dictionaries_[field_index];
if (!value.dictionary->type()->Equals(
checked_cast<const DictionaryType&>(*field->type()).value_type()))
{
@@ -189,62 +177,92 @@ Result<std::shared_ptr<Expression>>
KeyValuePartitioning::ConvertKey(
ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(field->type(), key.value));
}
- return equal(field_ref(field->name()), scalar(converted));
+ return equal(field_ref(field->name()), scalar(std::move(converted)));
}
Result<std::shared_ptr<Expression>> KeyValuePartitioning::Parse(
- const std::string& segment, int i) const {
- if (auto key = ParseKey(segment, i)) {
- return ConvertKey(*key, *schema_, dictionaries_);
+ const std::string& path) const {
+ ExpressionVector expressions;
+
+ for (const Key& key : ParseKeys(path)) {
+ ARROW_ASSIGN_OR_RAISE(auto expr, ConvertKey(key));
+
+ if (expr->Equals(true)) continue;
+
+ expressions.push_back(std::move(expr));
}
- return scalar(true);
+ return and_(std::move(expressions));
}
-Result<std::string> KeyValuePartitioning::Format(const Expression& expr, int
i) const {
- if (expr.type() != ExpressionType::COMPARISON) {
- return Status::Invalid(expr.ToString(), " is not a comparison expression");
- }
+Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const
{
+ std::vector<Scalar*> values{static_cast<size_t>(schema_->num_fields()),
nullptr};
+
+ RETURN_NOT_OK(VisitKeys(expr, [&](const std::string& name,
+ const std::shared_ptr<Scalar>& value) {
+ ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(name).FindOneOrNone(*schema_));
+ if (match) {
+ const auto& field = schema_->field(match[0]);
+ if (!value->type->Equals(field->type())) {
+ return Status::TypeError("scalar ", value->ToString(), " (of type ",
*value->type,
+ ") is invalid for ", field->ToString());
+ }
- const auto& cmp = checked_cast<const ComparisonExpression&>(expr);
- if (cmp.op() != compute::CompareOperator::EQUAL) {
- return Status::Invalid(expr.ToString(), " is not an equality comparison
expression");
- }
+ values[match[0]] = value.get();
+ }
+ return Status::OK();
+ }));
- if (cmp.left_operand()->type() != ExpressionType::FIELD) {
- return Status::Invalid(expr.ToString(), " LHS is not a field");
- }
- const auto& lhs = checked_cast<const FieldExpression&>(*cmp.left_operand());
+ return FormatValues(values);
+}
- if (cmp.right_operand()->type() != ExpressionType::SCALAR) {
- return Status::Invalid(expr.ToString(), " RHS is not a scalar");
- }
- const auto& rhs = checked_cast<const
ScalarExpression&>(*cmp.right_operand());
+std::vector<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKeys(
+ const std::string& path) const {
+ std::vector<Key> keys;
+
+ int i = 0;
+ for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
+ if (i >= schema_->num_fields()) break;
- auto expected_type = schema_->GetFieldByName(lhs.name())->type();
- if (!rhs.value()->type->Equals(expected_type)) {
- return Status::TypeError(expr.ToString(), " expected RHS to have type ",
- *expected_type);
+ keys.push_back({schema_->field(i++)->name(), std::move(segment)});
}
- return FormatKey({lhs.name(), rhs.value()->ToString()}, i);
+ return keys;
}
-util::optional<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKey(
- const std::string& segment, int i) const {
- if (i >= schema_->num_fields()) {
+inline util::optional<int> NextValid(const std::vector<Scalar*>& values, int
first_null) {
+ auto it = std::find_if(values.begin() + first_null + 1, values.end(),
+ [](Scalar* v) { return v != nullptr; });
+
+ if (it == values.end()) {
return util::nullopt;
}
- return Key{schema_->field(i)->name(), segment};
+ return static_cast<int>(it - values.begin());
}
-Result<std::string> DirectoryPartitioning::FormatKey(const Key& key, int i)
const {
- if (schema_->GetFieldIndex(key.name) != i) {
- return Status::Invalid("field ", key.name, " in unexpected position ", i,
- " for schema ", *schema_);
+Result<std::string> DirectoryPartitioning::FormatValues(
+ const std::vector<Scalar*>& values) const {
+ std::vector<std::string>
segments(static_cast<size_t>(schema_->num_fields()));
+
+ for (int i = 0; i < schema_->num_fields(); ++i) {
+ if (values[i] != nullptr) {
+ segments[i] = values[i]->ToString();
+ continue;
+ }
+
+ if (auto illegal_index = NextValid(values, i)) {
+ // XXX maybe we should just ignore keys provided after the first absent
one?
+ return Status::Invalid("No partition key for ",
schema_->field(i)->name(),
+ " but subsequent a key was provided subsequently
for ",
+ schema_->field(*illegal_index)->name(), ".");
+ }
+
+ // if all subsequent keys are absent we'll just print the available keys
+ break;
}
- return key.value;
+
+ return fs::internal::JoinAbstractPath(std::move(segments));
}
class KeyValuePartitioningInspectImpl {
@@ -522,10 +540,6 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl {
arrow::internal::Permute(permutation, &source_fragments_);
arrow::internal::Permute(permutation, &right_hand_sides_);
- // the basename of out.paths[i] is stored in segments[i] (full paths will
be assembled
- // after segments is complete)
- std::vector<std::string> segments;
-
// out.paths[parents[i]] is the parent directory of out.paths[i]
std::vector<int> parents;
@@ -533,6 +547,10 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl {
// partition expression corresponding to field_i
Indices current_right_hand_sides(num_fields(), -1);
+ // current_partition_expressions[field_i] is the current partition
expression
+ // corresponding to field_i
+ ExpressionVector current_partition_expressions(num_fields());
+
// out.paths[current_parents[field_i]] is the current ancestor directory
corresponding
// to field_i
Indices current_parents(num_fields() + 1, -1);
@@ -552,40 +570,33 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl {
current_parents[field_i + 1] = static_cast<int>(parents.size());
parents.push_back(current_parents[field_i]);
- auto partition_expression = PartitionExpression(fragment_i, field_i);
+ current_partition_expressions.resize(field_i + 1);
+ current_partition_expressions[field_i] =
PartitionExpression(fragment_i, field_i);
+ auto partition_expression = and_(current_partition_expressions);
// format segment for partition_expression
- ARROW_ASSIGN_OR_RAISE(auto segment,
- out.partitioning->Format(*partition_expression,
field_i));
- segment.push_back(fs::internal::kSep);
- segments.push_back(std::move(segment));
+ ARROW_ASSIGN_OR_RAISE(auto path,
out.partitioning->Format(*partition_expression));
+ out.paths.push_back(std::move(path));
// store partition_expression for use in the written Dataset
out.fragment_or_partition_expressions.emplace_back(
- std::move(partition_expression));
+ current_partition_expressions[field_i]);
current_right_hand_sides[field_i] =
right_hand_sides_[fragment_i][field_i];
}
// push a fragment (not attempting to give files meaningful names)
- parents.push_back(current_parents[field_i]);
- segments.emplace_back(Guid() + "_" + std::to_string(fragment_i));
+ std::string basename = Guid() + "_" + std::to_string(fragment_i);
+ int parent_i = current_parents[field_i];
+ parents.push_back(parent_i);
+ out.paths.push_back(fs::internal::JoinAbstractPath(
+ std::vector<std::string>{out.paths[parent_i], std::move(basename)}));
// store a fragment for writing to disk
out.fragment_or_partition_expressions.emplace_back(
std::move(source_fragments_[fragment_i]));
}
- // render paths from segments
- for (size_t i = 0; i < segments.size(); ++i) {
- if (parents[i] == -1) {
- out.paths.push_back(segments[i]);
- continue;
- }
-
- out.paths.push_back(out.paths[parents[i]] + segments[i]);
- }
-
return out;
}
@@ -643,8 +654,38 @@ util::optional<KeyValuePartitioning::Key>
HivePartitioning::ParseKey(
return Key{segment.substr(0, name_end), segment.substr(name_end + 1)};
}
-Result<std::string> HivePartitioning::FormatKey(const Key& key, int i) const {
- return key.name + "=" + key.value;
+std::vector<KeyValuePartitioning::Key> HivePartitioning::ParseKeys(
+ const std::string& path) const {
+ std::vector<Key> keys;
+
+ for (const auto& segment : fs::internal::SplitAbstractPath(path)) {
+ if (auto key = ParseKey(segment)) {
+ keys.push_back(std::move(*key));
+ }
+ }
+
+ return keys;
+}
+
+Result<std::string> HivePartitioning::FormatValues(
+ const std::vector<Scalar*>& values) const {
+ std::vector<std::string>
segments(static_cast<size_t>(schema_->num_fields()));
+
+ for (int i = 0; i < schema_->num_fields(); ++i) {
+ const std::string& name = schema_->field(i)->name();
+
+ if (values[i] == nullptr) {
+ if (!NextValid(values, i)) break;
+
+ // If no key is available just provide a placeholder segment to maintain
the
+ // field_index <-> path nesting relation
+ segments[i] = name;
+ } else {
+ segments[i] = name + "=" + values[i]->ToString();
+ }
+ }
+
+ return fs::internal::JoinAbstractPath(std::move(segments));
}
class HivePartitioningFactory : public PartitioningFactory {
diff --git a/cpp/src/arrow/dataset/partition.h
b/cpp/src/arrow/dataset/partition.h
index c93c19b..ea5828ec 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -59,21 +59,10 @@ class ARROW_DS_EXPORT Partitioning {
/// \brief The name identifying the kind of partitioning
virtual std::string type_name() const = 0;
- /// \brief Parse a path segment into a partition expression
- ///
- /// \param[in] segment the path segment to parse
- /// \param[in] i the index of segment within a path
- /// \return the parsed expression
- virtual Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
- int i) const = 0;
-
- virtual Result<std::string> Format(const Expression& expr, int i) const {
- // FIXME(bkietz) make this pure virtual
- return Status::NotImplemented("formatting paths from ", type_name(), "
Partitioning");
- }
-
/// \brief Parse a path into a partition expression
- Result<std::shared_ptr<Expression>> Parse(const std::string& path) const;
+ virtual Result<std::shared_ptr<Expression>> Parse(const std::string& path)
const = 0;
+
+ virtual Result<std::string> Format(const Expression& expr) const = 0;
/// \brief A default Partitioning which always yields scalar(true)
static std::shared_ptr<Partitioning> Default();
@@ -126,40 +115,6 @@ class ARROW_DS_EXPORT PartitioningFactory {
FragmentIterator fragments);
};
-/// \brief Subclass for representing the default, a partitioning that returns
-/// the idempotent "true" partition.
-class DefaultPartitioning : public Partitioning {
- public:
- DefaultPartitioning() : Partitioning(::arrow::schema({})) {}
-
- std::string type_name() const override { return "default"; }
-
- Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
- int i) const override {
- return scalar(true);
- }
-};
-
-/// \brief Subclass for looking up partition information from a dictionary
-/// mapping segments to expressions provided on construction.
-class ARROW_DS_EXPORT SegmentDictionaryPartitioning : public Partitioning {
- public:
- SegmentDictionaryPartitioning(
- std::shared_ptr<Schema> schema,
- std::vector<std::unordered_map<std::string, std::shared_ptr<Expression>>>
- dictionaries)
- : Partitioning(std::move(schema)),
dictionaries_(std::move(dictionaries)) {}
-
- std::string type_name() const override { return "segment_dictionary"; }
-
- /// Return dictionaries_[i][segment] or scalar(true)
- Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
- int i) const override;
-
- protected:
- std::vector<std::unordered_map<std::string, std::shared_ptr<Expression>>>
dictionaries_;
-};
-
/// \brief Subclass for the common case of a partitioning which yields an
equality
/// expression for each segment
class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
@@ -181,21 +136,9 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public
Partitioning {
static Status SetDefaultValuesFromKeys(const Expression& expr,
RecordBatchProjector* projector);
- /// Convert a Key to a full expression.
- /// If the field referenced in key is absent from the schema will be ignored.
- static Result<std::shared_ptr<Expression>> ConvertKey(const Key& key,
- const Schema& schema,
- const ArrayVector&
dictionaries);
-
- /// Extract a partition key from a path segment.
- virtual util::optional<Key> ParseKey(const std::string& segment, int i)
const = 0;
-
- virtual Result<std::string> FormatKey(const Key& expr, int i) const = 0;
-
- Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
- int i) const override;
+ Result<std::shared_ptr<Expression>> Parse(const std::string& path) const
override;
- Result<std::string> Format(const Expression& expr, int i) const override;
+ Result<std::string> Format(const Expression& expr) const override;
protected:
KeyValuePartitioning(std::shared_ptr<Schema> schema, ArrayVector
dictionaries)
@@ -205,6 +148,13 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public
Partitioning {
}
}
+ virtual std::vector<Key> ParseKeys(const std::string& path) const = 0;
+
+ virtual Result<std::string> FormatValues(const std::vector<Scalar*>& values)
const = 0;
+
+ /// Convert a Key to a full expression.
+ Result<std::shared_ptr<Expression>> ConvertKey(const Key& key) const;
+
ArrayVector dictionaries_;
};
@@ -224,12 +174,13 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public
KeyValuePartitioning {
std::string type_name() const override { return "schema"; }
- util::optional<Key> ParseKey(const std::string& segment, int i) const
override;
-
- Result<std::string> FormatKey(const Key& key, int i) const override;
-
static std::shared_ptr<PartitioningFactory> MakeFactory(
std::vector<std::string> field_names, PartitioningFactoryOptions = {});
+
+ private:
+ std::vector<Key> ParseKeys(const std::string& path) const override;
+
+ Result<std::string> FormatValues(const std::vector<Scalar*>& values) const
override;
};
/// \brief Multi-level, directory based partitioning
@@ -250,36 +201,48 @@ class ARROW_DS_EXPORT HivePartitioning : public
KeyValuePartitioning {
std::string type_name() const override { return "hive"; }
- util::optional<Key> ParseKey(const std::string& segment, int i) const
override {
- return ParseKey(segment);
- }
-
- Result<std::string> FormatKey(const Key& key, int i) const override;
-
static util::optional<Key> ParseKey(const std::string& segment);
static std::shared_ptr<PartitioningFactory> MakeFactory(
PartitioningFactoryOptions = {});
+
+ private:
+ std::vector<Key> ParseKeys(const std::string& path) const override;
+
+ Result<std::string> FormatValues(const std::vector<Scalar*>& values) const
override;
};
/// \brief Implementation provided by lambda or other callable
class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning {
public:
- explicit FunctionPartitioning(
- std::shared_ptr<Schema> schema,
- std::function<Result<std::shared_ptr<Expression>>(const std::string&,
int)> impl,
- std::string name = "function")
- : Partitioning(std::move(schema)), impl_(std::move(impl)),
name_(std::move(name)) {}
+ using ParseImpl =
+ std::function<Result<std::shared_ptr<Expression>>(const std::string&)>;
+
+ using FormatImpl = std::function<Result<std::string>(const Expression&)>;
+
+ FunctionPartitioning(std::shared_ptr<Schema> schema, ParseImpl parse_impl,
+ FormatImpl format_impl = NULLPTR, std::string name =
"function")
+ : Partitioning(std::move(schema)),
+ parse_impl_(std::move(parse_impl)),
+ format_impl_(std::move(format_impl)),
+ name_(std::move(name)) {}
std::string type_name() const override { return name_; }
- Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
- int i) const override {
- return impl_(segment, i);
+ Result<std::shared_ptr<Expression>> Parse(const std::string& path) const
override {
+ return parse_impl_(path);
+ }
+
+ Result<std::string> Format(const Expression& expr) const override {
+ if (format_impl_) {
+ return format_impl_(expr);
+ }
+ return Status::NotImplemented("formatting paths from ", type_name(), "
Partitioning");
}
private:
- std::function<Result<std::shared_ptr<Expression>>(const std::string&, int)>
impl_;
+ ParseImpl parse_impl_;
+ FormatImpl format_impl_;
std::string name_;
};
diff --git a/cpp/src/arrow/dataset/partition_test.cc
b/cpp/src/arrow/dataset/partition_test.cc
index 7e7c310..7636529 100644
--- a/cpp/src/arrow/dataset/partition_test.cc
+++ b/cpp/src/arrow/dataset/partition_test.cc
@@ -43,16 +43,27 @@ using E = TestExpression;
class TestPartitioning : public ::testing::Test {
public:
void AssertParseError(const std::string& path) {
- ASSERT_RAISES(Invalid, partitioning_->Parse(path).status());
+ ASSERT_RAISES(Invalid, partitioning_->Parse(path));
}
- void AssertParse(const std::string& path, std::shared_ptr<Expression>
expected) {
+ void AssertParse(const std::string& path, E expected) {
ASSERT_OK_AND_ASSIGN(auto parsed, partitioning_->Parse(path));
- ASSERT_EQ(E{parsed}, E{expected});
+ ASSERT_EQ(E{parsed}, expected);
}
- void AssertParse(const std::string& path, const Expression& expected) {
- AssertParse(path, expected.Copy());
+ template <StatusCode code = StatusCode::Invalid>
+ void AssertFormatError(E expr) {
+ ASSERT_EQ(partitioning_->Format(*expr.expression).status().code(), code);
+ }
+
+ void AssertFormat(E expr, const std::string& expected) {
+ ASSERT_OK_AND_ASSIGN(auto formatted,
partitioning_->Format(*expr.expression));
+ ASSERT_EQ(formatted, expected);
+
+ // ensure the formatted path round trips the relevant components of the
partition
+ // expression: roundtripped should be a subset of expr
+ ASSERT_OK_AND_ASSIGN(auto roundtripped, partitioning_->Parse(formatted));
+ ASSERT_EQ(E{roundtripped->Assume(*expr.expression)}, E{scalar(true)});
}
void AssertInspect(const std::vector<std::string>& paths,
@@ -83,33 +94,6 @@ class TestPartitioning : public ::testing::Test {
std::shared_ptr<PartitioningFactory> factory_;
};
-TEST_F(TestPartitioning, SegmentDictionary) {
- using Dict = std::unordered_map<std::string, std::shared_ptr<Expression>>;
- Dict alpha_dict, beta_dict;
- auto add_expr = [](const std::string& segment, const Expression& expr, Dict*
dict) {
- dict->emplace(segment, expr.Copy());
- };
-
- add_expr("zero", "alpha"_ == 0, &alpha_dict);
- add_expr("one", "alpha"_ == 1, &alpha_dict);
- add_expr("more", "alpha"_ >= 2, &alpha_dict);
-
- add_expr("...", "beta"_ == "uh", &beta_dict);
- add_expr("?", "beta"_ == "what", &beta_dict);
- add_expr("!", "beta"_ == "OH", &beta_dict);
-
- partitioning_.reset(new SegmentDictionaryPartitioning(
- schema({field("alpha", int32()), field("beta", utf8())}), {alpha_dict,
beta_dict}));
-
- AssertParse("/one/?", "alpha"_ == int32_t(1) and "beta"_ == "what");
- AssertParse("/one/?/---", "alpha"_ == int32_t(1) and "beta"_ == "what");
- AssertParse("/more", "alpha"_ >= int32_t(2)); // missing second segment
- AssertParse("/---/---", scalar(true)); // unrecognized segments
- AssertParse("/---/!", "beta"_ == "OH"); // unrecognized first
segment
- AssertParse("/zero/---", "alpha"_ == int32_t(0)); // unrecognized second
segment
- AssertParse("", scalar(true)); // no segments to parse
-}
-
TEST_F(TestPartitioning, DirectoryPartitioning) {
partitioning_ = std::make_shared<DirectoryPartitioning>(
schema({field("alpha", int32()), field("beta", utf8())}));
@@ -127,6 +111,21 @@ TEST_F(TestPartitioning, DirectoryPartitioning) {
AssertParse("/0/foo/ignored=2341", "alpha"_ == int32_t(0) and "beta"_ ==
"foo");
}
+TEST_F(TestPartitioning, DirectoryPartitioningFormat) {
+ partitioning_ = std::make_shared<DirectoryPartitioning>(
+ schema({field("alpha", int32()), field("beta", utf8())}));
+
+ AssertFormat("alpha"_ == int32_t(0) and "beta"_ == "hello", "0/hello");
+ AssertFormat("beta"_ == "hello" and "alpha"_ == int32_t(0), "0/hello");
+ AssertFormat("alpha"_ == int32_t(0), "0");
+ AssertFormatError("beta"_ == "hello");
+ AssertFormat(scalar(true), "");
+
+ AssertFormatError<StatusCode::TypeError>("alpha"_ == 0.0 and "beta"_ ==
"hello");
+ AssertFormat("gamma"_ == "yo" and "alpha"_ == int32_t(0) and "beta"_ ==
"hello",
+ "0/hello");
+}
+
TEST_F(TestPartitioning, DiscoverSchema) {
factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"});
@@ -213,6 +212,21 @@ TEST_F(TestPartitioning, HivePartitioning) {
AssertParseError("/alpha=0.0/beta=3.25"); // conversion of "0.0" to int32
fails
}
+TEST_F(TestPartitioning, HivePartitioningFormat) {
+ partitioning_ = std::make_shared<HivePartitioning>(
+ schema({field("alpha", int32()), field("beta", float32())}));
+
+ AssertFormat("alpha"_ == int32_t(0) and "beta"_ == 3.25f,
"alpha=0/beta=3.25");
+ AssertFormat("beta"_ == 3.25f and "alpha"_ == int32_t(0),
"alpha=0/beta=3.25");
+ AssertFormat("alpha"_ == int32_t(0), "alpha=0");
+ AssertFormat("beta"_ == 3.25f, "alpha/beta=3.25");
+ AssertFormat(scalar(true), "");
+
+ AssertFormatError<StatusCode::TypeError>("alpha"_ == "yo" and "beta"_ ==
3.25f);
+ AssertFormat("gamma"_ == "yo" and "alpha"_ == int32_t(0) and "beta"_ ==
3.25f,
+ "alpha=0/beta=3.25");
+}
+
TEST_F(TestPartitioning, DiscoverHiveSchema) {
factory_ = HivePartitioning::MakeFactory();
@@ -283,90 +297,113 @@ TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) {
}
TEST_F(TestPartitioning, EtlThenHive) {
- DirectoryPartitioning etl_part(schema({field("year", int16()),
field("month", int8()),
- field("day", int8()), field("hour",
int8())}));
- HivePartitioning alphabeta_part(
- schema({field("alpha", int32()), field("beta", float32())}));
+ FieldVector etl_fields{field("year", int16()), field("month", int8()),
+ field("day", int8()), field("hour", int8())};
+ DirectoryPartitioning etl_part(schema(etl_fields));
+
+ FieldVector alphabeta_fields{field("alpha", int32()), field("beta",
float32())};
+ HivePartitioning alphabeta_part(schema(alphabeta_fields));
partitioning_ = std::make_shared<FunctionPartitioning>(
schema({field("year", int16()), field("month", int8()), field("day",
int8()),
field("hour", int8()), field("alpha", int32()), field("beta",
float32())}),
- [&](const std::string& segment, int i) ->
Result<std::shared_ptr<Expression>> {
- if (i < etl_part.schema()->num_fields()) {
- return etl_part.Parse(segment, i);
+ [&](const std::string& path) -> Result<std::shared_ptr<Expression>> {
+ auto segments = fs::internal::SplitAbstractPath(path);
+ if (segments.size() < etl_fields.size() + alphabeta_fields.size()) {
+ return Status::Invalid("path ", path, " can't be parsed");
}
- i -= etl_part.schema()->num_fields();
- return alphabeta_part.Parse(segment, i);
+ auto etl_segments_end = segments.begin() + etl_fields.size();
+ auto etl_path =
+ fs::internal::JoinAbstractPath(segments.begin(), etl_segments_end);
+ ARROW_ASSIGN_OR_RAISE(auto etl_expr, etl_part.Parse(etl_path));
+
+ auto alphabeta_segments_end = etl_segments_end +
alphabeta_fields.size();
+ auto alphabeta_path =
+ fs::internal::JoinAbstractPath(etl_segments_end,
alphabeta_segments_end);
+ ARROW_ASSIGN_OR_RAISE(auto alphabeta_expr,
alphabeta_part.Parse(alphabeta_path));
+
+ return and_(etl_expr, alphabeta_expr);
});
AssertParse("/1999/12/31/00/alpha=0/beta=3.25",
"year"_ == int16_t(1999) and "month"_ == int8_t(12) and
"day"_ == int8_t(31) and "hour"_ == int8_t(0) and
- "alpha"_ == int32_t(0) and "beta"_ == 3.25f);
+ ("alpha"_ == int32_t(0) and "beta"_ == 3.25f));
AssertParseError("/20X6/03/21/05/alpha=0/beta=3.25");
-}
+} // namespace dataset
TEST_F(TestPartitioning, Set) {
+ auto ints = [](std::vector<int32_t> ints) {
+ std::shared_ptr<Array> out;
+ ArrayFromVector<Int32Type>(ints, &out);
+ return out;
+ };
+
// An adhoc partitioning which parses segments like "/x in [1 4 5]"
// into ("x"_ == 1 or "x"_ == 4 or "x"_ == 5)
partitioning_ = std::make_shared<FunctionPartitioning>(
schema({field("x", int32())}),
- [](const std::string& segment, int) ->
Result<std::shared_ptr<Expression>> {
- std::smatch matches;
-
- static std::regex re("^x in \\[(.*)\\]$");
- if (!std::regex_match(segment, matches, re) || matches.size() != 2) {
- return Status::Invalid("regex failed to parse");
- }
-
+ [&](const std::string& path) -> Result<std::shared_ptr<Expression>> {
ExpressionVector subexpressions;
- std::string element;
- std::istringstream elements(matches[1]);
- while (elements >> element) {
- ARROW_ASSIGN_OR_RAISE(auto s, Scalar::Parse(int32(), element));
- subexpressions.push_back(equal(field_ref("x"), scalar(s)));
+ for (auto segment : fs::internal::SplitAbstractPath(path)) {
+ std::smatch matches;
+
+ static std::regex re(R"(^(\S+) in \[(.*)\]$)");
+ if (!std::regex_match(segment, matches, re) || matches.size() != 3) {
+ return Status::Invalid("regex failed to parse");
+ }
+
+ std::vector<int32_t> set;
+ std::istringstream elements(matches[2]);
+ for (std::string element; elements >> element;) {
+ ARROW_ASSIGN_OR_RAISE(auto s, Scalar::Parse(int32(), element));
+ set.push_back(checked_cast<const Int32Scalar&>(*s).value);
+ }
+
+
subexpressions.push_back(field_ref(matches[1])->In(ints(set)).Copy());
}
-
- return or_(std::move(subexpressions));
+ return and_(std::move(subexpressions));
});
- AssertParse("/x in [1]", "x"_ == 1);
- AssertParse("/x in [1 4 5]", "x"_ == 1 or "x"_ == 4 or "x"_ == 5);
- AssertParse("/x in []", scalar(false));
+ AssertParse("/x in [1]", "x"_.In(ints({1})));
+ AssertParse("/x in [1 4 5]", "x"_.In(ints({1, 4, 5})));
+ AssertParse("/x in []", "x"_.In(ints({})));
}
// An adhoc partitioning which parses segments like "/x=[-3.25, 0.0)"
// into ("x"_ >= -3.25 and "x" < 0.0)
-class RangePartitioning : public HivePartitioning {
+class RangePartitioning : public Partitioning {
public:
- using HivePartitioning::HivePartitioning;
+ explicit RangePartitioning(std::shared_ptr<Schema> s) :
Partitioning(std::move(s)) {}
std::string type_name() const override { return "range"; }
- Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
- int i) const override {
+ Result<std::shared_ptr<Expression>> Parse(const std::string& path) const
override {
ExpressionVector ranges;
- auto key = ParseKey(segment, i);
- if (!key) {
- return Status::Invalid("can't parse '", segment, "' as a range");
- }
- std::smatch matches;
- RETURN_NOT_OK(DoRegex(key->value, &matches));
+ for (auto segment : fs::internal::SplitAbstractPath(path)) {
+ auto key = HivePartitioning::ParseKey(segment);
+ if (!key) {
+ return Status::Invalid("can't parse '", segment, "' as a range");
+ }
+
+ std::smatch matches;
+ RETURN_NOT_OK(DoRegex(key->value, &matches));
- auto& min_cmp = matches[1] == "[" ? greater_equal : greater;
- std::string min_repr = matches[2];
- std::string max_repr = matches[3];
- auto& max_cmp = matches[4] == "]" ? less_equal : less;
+ auto& min_cmp = matches[1] == "[" ? greater_equal : greater;
+ std::string min_repr = matches[2];
+ std::string max_repr = matches[3];
+ auto& max_cmp = matches[4] == "]" ? less_equal : less;
- const auto& type = schema_->GetFieldByName(key->name)->type();
- ARROW_ASSIGN_OR_RAISE(auto min, Scalar::Parse(type, min_repr));
- ARROW_ASSIGN_OR_RAISE(auto max, Scalar::Parse(type, max_repr));
+ const auto& type = schema_->GetFieldByName(key->name)->type();
+ ARROW_ASSIGN_OR_RAISE(auto min, Scalar::Parse(type, min_repr));
+ ARROW_ASSIGN_OR_RAISE(auto max, Scalar::Parse(type, max_repr));
- ranges.push_back(and_(min_cmp(field_ref(key->name), scalar(min)),
- max_cmp(field_ref(key->name), scalar(max))));
+ ranges.push_back(and_(min_cmp(field_ref(key->name), scalar(min)),
+ max_cmp(field_ref(key->name), scalar(max))));
+ }
return and_(ranges);
}
@@ -386,6 +423,8 @@ class RangePartitioning : public HivePartitioning {
return Status::OK();
}
+
+ Result<std::string> Format(const Expression&) const override { return ""; }
};
TEST_F(TestPartitioning, Range) {
@@ -456,7 +495,7 @@ class TestPartitioningWritePlan : public ::testing::Test {
static_cast<int>(std::find(fragments.begin(), fragments.end(),
fragment) -
fragments.begin());
auto path =
fs::internal::GetAbstractPathParent(actual_plan.paths[i]).first;
- dirs_[path + "/"].fragments.push_back(fragment_index);
+ dirs_[path].fragments.push_back(fragment_index);
} else {
auto partition_expression = op.partition_expr();
dirs_[actual_plan.paths[i]].partition_expression =
partition_expression;
@@ -526,15 +565,15 @@ TEST_F(TestPartitioningWritePlan, SingleDirectory) {
MakeWritePlan("a"_ == 42, "a"_ == 99, "a"_ == 101);
AssertPlanIs(ExpectedWritePlan()
- .Dir("42/", "a"_ == 42, {0})
- .Dir("99/", "a"_ == 99, {1})
- .Dir("101/", "a"_ == 101, {2}));
+ .Dir("42", "a"_ == 42, {0})
+ .Dir("99", "a"_ == 99, {1})
+ .Dir("101", "a"_ == 101, {2}));
MakeWritePlan("a"_ == 42, "a"_ == 99, "a"_ == 99, "a"_ == 101, "a"_ == 99);
AssertPlanIs(ExpectedWritePlan()
- .Dir("42/", "a"_ == 42, {0})
- .Dir("99/", "a"_ == 99, {1, 2, 4})
- .Dir("101/", "a"_ == 101, {3}));
+ .Dir("42", "a"_ == 42, {0})
+ .Dir("99", "a"_ == 99, {1, 2, 4})
+ .Dir("101", "a"_ == 101, {3}));
}
TEST_F(TestPartitioningWritePlan, NestedDirectories) {
@@ -544,12 +583,12 @@ TEST_F(TestPartitioningWritePlan, NestedDirectories) {
"a"_ == 99 and "b"_ == "hello", "a"_ == 99 and "b"_ ==
"world");
AssertPlanIs(ExpectedWritePlan()
- .Dir("42/", "a"_ == 42, {})
- .Dir("42/hello/", "b"_ == "hello", {0})
- .Dir("42/world/", "b"_ == "world", {1})
- .Dir("99/", "a"_ == 99, {})
- .Dir("99/hello/", "b"_ == "hello", {2})
- .Dir("99/world/", "b"_ == "world", {3}));
+ .Dir("42", "a"_ == 42, {})
+ .Dir("42/hello", "b"_ == "hello", {0})
+ .Dir("42/world", "b"_ == "world", {1})
+ .Dir("99", "a"_ == 99, {})
+ .Dir("99/hello", "b"_ == "hello", {2})
+ .Dir("99/world", "b"_ == "world", {3}));
}
TEST_F(TestPartitioningWritePlan, Errors) {
@@ -558,9 +597,9 @@ TEST_F(TestPartitioningWritePlan, Errors) {
Invalid, testing::HasSubstr("no partition expression for field 'a'"),
MakeWritePlanError("a"_ == 42, scalar(true), "a"_ == 101));
- EXPECT_RAISES_WITH_MESSAGE_THAT(TypeError,
- testing::HasSubstr("expected RHS to have
type int32"),
- MakeWritePlanError("a"_ == 42, "a"_ ==
"hello"));
+ EXPECT_RAISES_WITH_MESSAGE_THAT(
+ TypeError, testing::HasSubstr("scalar hello (of type string) is
invalid"),
+ MakeWritePlanError("a"_ == 42, "a"_ == "hello"));
factory_ = DirectoryPartitioning::MakeFactory({"a", "b"});
EXPECT_RAISES_WITH_MESSAGE_THAT(
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc
b/cpp/src/arrow/filesystem/filesystem_test.cc
index bce70d7..b679f8b 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -139,7 +139,7 @@ TEST(PathUtil, ConcatAbstractPath) {
}
TEST(PathUtil, JoinAbstractPath) {
- std::vector<std::string> parts = {"abc", "def", "ghi", "jkl"};
+ std::vector<std::string> parts = {"abc", "def", "ghi", "", "jkl"};
ASSERT_EQ("abc/def/ghi/jkl", JoinAbstractPath(parts.begin(), parts.end()));
ASSERT_EQ("def/ghi", JoinAbstractPath(parts.begin() + 1, parts.begin() + 3));
diff --git a/cpp/src/arrow/filesystem/path_util.h
b/cpp/src/arrow/filesystem/path_util.h
index 4743053..46fa415 100644
--- a/cpp/src/arrow/filesystem/path_util.h
+++ b/cpp/src/arrow/filesystem/path_util.h
@@ -92,6 +92,8 @@ template <class StringIt>
std::string JoinAbstractPath(StringIt it, StringIt end) {
std::string path;
for (; it != end; ++it) {
+ if (it->empty()) continue;
+
if (!path.empty()) {
path += kSep;
}
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index d534d58..186daac 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -1171,15 +1171,19 @@ std::vector<FieldPath> FieldRef::FindAll(const
FieldVector& fields) const {
size_t size() const { return referents.size(); }
- void Add(FieldPath prefix, const FieldPath& match, const FieldVector&
fields) {
- auto maybe_field = match.Get(fields);
+ void Add(const FieldPath& prefix, const FieldPath& suffix,
+ const FieldVector& fields) {
+ auto maybe_field = suffix.Get(fields);
DCHECK_OK(maybe_field.status());
-
- prefix.indices().resize(prefix.indices().size() +
match.indices().size());
- std::copy(match.indices().begin(), match.indices().end(),
- prefix.indices().end() - match.indices().size());
- prefixes.push_back(std::move(prefix));
referents.push_back(std::move(maybe_field).ValueOrDie());
+
+ std::vector<int> concatenated_indices(prefix.indices().size() +
+ suffix.indices().size());
+ auto it = concatenated_indices.begin();
+ for (auto path : {&prefix, &suffix}) {
+ it = std::copy(path->indices().begin(), path->indices().end(), it);
+ }
+ prefixes.emplace_back(std::move(concatenated_indices));
}
};
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 86d8a79..e2532d2 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1363,11 +1363,14 @@ class ARROW_EXPORT FieldPath {
size_t hash() const;
explicit operator bool() const { return !indices_.empty(); }
+ bool operator!() const { return indices_.empty(); }
bool operator==(const FieldPath& other) const { return indices() ==
other.indices(); }
- bool operator!=(const FieldPath& other) const { return !(*this == other); }
+ bool operator!=(const FieldPath& other) const { return indices() !=
other.indices(); }
- std::vector<int>& indices() { return indices_; }
const std::vector<int>& indices() const { return indices_; }
+ int operator[](size_t i) const { return indices_[i]; }
+ std::vector<int>::const_iterator begin() const { return indices_.begin(); }
+ std::vector<int>::const_iterator end() const { return indices_.end(); }
/// \brief Retrieve the referenced child Field from a Schema, Field, or
DataType
Result<std::shared_ptr<Field>> Get(const Schema& schema) const;