This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e3daf0  ARROW-9068: [C++][Dataset] Simplify partitioning interface
6e3daf0 is described below

commit 6e3daf0defcd7ef9a375aed006b5b75bc230ff3e
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Wed Jul 29 14:59:41 2020 +0200

    ARROW-9068: [C++][Dataset] Simplify partitioning interface
    
    Format and Parse now operate on a whole path rather than on individual 
segments.
    
    This will simplify building WritePlans but I've only made minimal changes 
there (to keep tests passing); follow up in 
https://issues.apache.org/jira/browse/ARROW-8382
    
    Closes #7820 from bkietz/9068-simplify-partitioning-interface
    
    Authored-by: Benjamin Kietzman <[email protected]>
    Signed-off-by: Krisztián Szűcs <[email protected]>
---
 cpp/src/arrow/dataset/discovery_test.cc     |  13 --
 cpp/src/arrow/dataset/partition.cc          | 225 ++++++++++++++++-----------
 cpp/src/arrow/dataset/partition.h           | 125 ++++++---------
 cpp/src/arrow/dataset/partition_test.cc     | 231 ++++++++++++++++------------
 cpp/src/arrow/filesystem/filesystem_test.cc |   2 +-
 cpp/src/arrow/filesystem/path_util.h        |   2 +
 cpp/src/arrow/type.cc                       |  18 ++-
 cpp/src/arrow/type.h                        |   7 +-
 8 files changed, 331 insertions(+), 292 deletions(-)

diff --git a/cpp/src/arrow/dataset/discovery_test.cc 
b/cpp/src/arrow/dataset/discovery_test.cc
index 2fc21de..a3f1379 100644
--- a/cpp/src/arrow/dataset/discovery_test.cc
+++ b/cpp/src/arrow/dataset/discovery_test.cc
@@ -80,19 +80,6 @@ class MockDatasetFactory : public DatasetFactory {
   std::vector<std::shared_ptr<Schema>> schemas_;
 };
 
-class MockPartitioning : public Partitioning {
- public:
-  explicit MockPartitioning(std::shared_ptr<Schema> schema)
-      : Partitioning(std::move(schema)) {}
-
-  Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
-                                            int i) const override {
-    return nullptr;
-  }
-
-  std::string type_name() const override { return "mock_partitioning"; }
-};
-
 class MockDatasetFactoryTest : public DatasetFactoryTest {
  public:
   void MakeFactory(std::vector<std::shared_ptr<Schema>> schemas) {
diff --git a/cpp/src/arrow/dataset/partition.cc 
b/cpp/src/arrow/dataset/partition.cc
index 9f497a1..5844f89 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -47,23 +47,23 @@ using util::string_view;
 
 using arrow::internal::checked_cast;
 
-Result<std::shared_ptr<Expression>> Partitioning::Parse(const std::string& 
path) const {
-  ExpressionVector expressions;
-  int i = 0;
+std::shared_ptr<Partitioning> Partitioning::Default() {
+  class DefaultPartitioning : public Partitioning {
+   public:
+    DefaultPartitioning() : Partitioning(::arrow::schema({})) {}
 
-  for (auto segment : fs::internal::SplitAbstractPath(path)) {
-    ARROW_ASSIGN_OR_RAISE(auto expr, Parse(segment, i++));
-    if (expr->Equals(true)) {
-      continue;
-    }
+    std::string type_name() const override { return "default"; }
 
-    expressions.push_back(std::move(expr));
-  }
+    Result<std::shared_ptr<Expression>> Parse(const std::string& path) const 
override {
+      return scalar(true);
+    }
 
-  return and_(std::move(expressions));
-}
+    Result<std::string> Format(const Expression& expr) const override {
+      return Status::NotImplemented("formatting paths from ", type_name(),
+                                    " Partitioning");
+    }
+  };
 
-std::shared_ptr<Partitioning> Partitioning::Default() {
   return std::make_shared<DefaultPartitioning>();
 }
 
@@ -80,18 +80,6 @@ Result<WritePlan> PartitioningFactory::MakeWritePlan(
                                 type_name());
 }
 
-Result<std::shared_ptr<Expression>> SegmentDictionaryPartitioning::Parse(
-    const std::string& segment, int i) const {
-  if (static_cast<size_t>(i) < dictionaries_.size()) {
-    auto it = dictionaries_[i].find(segment);
-    if (it != dictionaries_[i].end()) {
-      return it->second;
-    }
-  }
-
-  return scalar(true);
-}
-
 Status KeyValuePartitioning::VisitKeys(
     const Expression& expr,
     const std::function<Status(const std::string& name,
@@ -141,7 +129,7 @@ Status KeyValuePartitioning::SetDefaultValuesFromKeys(const 
Expression& expr,
       expr, [projector](const std::string& name, const 
std::shared_ptr<Scalar>& value) {
         ARROW_ASSIGN_OR_RAISE(auto match,
                               
FieldRef(name).FindOneOrNone(*projector->schema()));
-        if (match.indices().empty()) {
+        if (!match) {
           return Status::OK();
         }
         return projector->SetDefaultValue(match, value);
@@ -149,25 +137,25 @@ Status 
KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
 }
 
 Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
-    const Key& key, const Schema& schema, const ArrayVector& dictionaries) {
-  ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(schema));
-  if (match.indices().empty()) {
+    const Key& key) const {
+  ARROW_ASSIGN_OR_RAISE(auto match, 
FieldRef(key.name).FindOneOrNone(*schema_));
+  if (!match) {
     return scalar(true);
   }
 
-  auto field_index = match.indices()[0];
-  auto field = schema.field(field_index);
+  auto field_index = match[0];
+  auto field = schema_->field(field_index);
 
   std::shared_ptr<Scalar> converted;
 
   if (field->type()->id() == Type::DICTIONARY) {
-    if (dictionaries.empty() || dictionaries[field_index] == nullptr) {
+    if (dictionaries_.empty() || dictionaries_[field_index] == nullptr) {
       return Status::Invalid("No dictionary provided for dictionary field ",
                              field->ToString());
     }
 
     DictionaryScalar::ValueType value;
-    value.dictionary = dictionaries[field_index];
+    value.dictionary = dictionaries_[field_index];
 
     if (!value.dictionary->type()->Equals(
             checked_cast<const DictionaryType&>(*field->type()).value_type())) 
{
@@ -189,62 +177,92 @@ Result<std::shared_ptr<Expression>> 
KeyValuePartitioning::ConvertKey(
     ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(field->type(), key.value));
   }
 
-  return equal(field_ref(field->name()), scalar(converted));
+  return equal(field_ref(field->name()), scalar(std::move(converted)));
 }
 
 Result<std::shared_ptr<Expression>> KeyValuePartitioning::Parse(
-    const std::string& segment, int i) const {
-  if (auto key = ParseKey(segment, i)) {
-    return ConvertKey(*key, *schema_, dictionaries_);
+    const std::string& path) const {
+  ExpressionVector expressions;
+
+  for (const Key& key : ParseKeys(path)) {
+    ARROW_ASSIGN_OR_RAISE(auto expr, ConvertKey(key));
+
+    if (expr->Equals(true)) continue;
+
+    expressions.push_back(std::move(expr));
   }
 
-  return scalar(true);
+  return and_(std::move(expressions));
 }
 
-Result<std::string> KeyValuePartitioning::Format(const Expression& expr, int 
i) const {
-  if (expr.type() != ExpressionType::COMPARISON) {
-    return Status::Invalid(expr.ToString(), " is not a comparison expression");
-  }
+Result<std::string> KeyValuePartitioning::Format(const Expression& expr) const 
{
+  std::vector<Scalar*> values{static_cast<size_t>(schema_->num_fields()), 
nullptr};
+
+  RETURN_NOT_OK(VisitKeys(expr, [&](const std::string& name,
+                                    const std::shared_ptr<Scalar>& value) {
+    ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(name).FindOneOrNone(*schema_));
+    if (match) {
+      const auto& field = schema_->field(match[0]);
+      if (!value->type->Equals(field->type())) {
+        return Status::TypeError("scalar ", value->ToString(), " (of type ", 
*value->type,
+                                 ") is invalid for ", field->ToString());
+      }
 
-  const auto& cmp = checked_cast<const ComparisonExpression&>(expr);
-  if (cmp.op() != compute::CompareOperator::EQUAL) {
-    return Status::Invalid(expr.ToString(), " is not an equality comparison 
expression");
-  }
+      values[match[0]] = value.get();
+    }
+    return Status::OK();
+  }));
 
-  if (cmp.left_operand()->type() != ExpressionType::FIELD) {
-    return Status::Invalid(expr.ToString(), " LHS is not a field");
-  }
-  const auto& lhs = checked_cast<const FieldExpression&>(*cmp.left_operand());
+  return FormatValues(values);
+}
 
-  if (cmp.right_operand()->type() != ExpressionType::SCALAR) {
-    return Status::Invalid(expr.ToString(), " RHS is not a scalar");
-  }
-  const auto& rhs = checked_cast<const 
ScalarExpression&>(*cmp.right_operand());
+std::vector<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKeys(
+    const std::string& path) const {
+  std::vector<Key> keys;
+
+  int i = 0;
+  for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
+    if (i >= schema_->num_fields()) break;
 
-  auto expected_type = schema_->GetFieldByName(lhs.name())->type();
-  if (!rhs.value()->type->Equals(expected_type)) {
-    return Status::TypeError(expr.ToString(), " expected RHS to have type ",
-                             *expected_type);
+    keys.push_back({schema_->field(i++)->name(), std::move(segment)});
   }
 
-  return FormatKey({lhs.name(), rhs.value()->ToString()}, i);
+  return keys;
 }
 
-util::optional<KeyValuePartitioning::Key> DirectoryPartitioning::ParseKey(
-    const std::string& segment, int i) const {
-  if (i >= schema_->num_fields()) {
+inline util::optional<int> NextValid(const std::vector<Scalar*>& values, int 
first_null) {
+  auto it = std::find_if(values.begin() + first_null + 1, values.end(),
+                         [](Scalar* v) { return v != nullptr; });
+
+  if (it == values.end()) {
     return util::nullopt;
   }
 
-  return Key{schema_->field(i)->name(), segment};
+  return static_cast<int>(it - values.begin());
 }
 
-Result<std::string> DirectoryPartitioning::FormatKey(const Key& key, int i) 
const {
-  if (schema_->GetFieldIndex(key.name) != i) {
-    return Status::Invalid("field ", key.name, " in unexpected position ", i,
-                           " for schema ", *schema_);
+Result<std::string> DirectoryPartitioning::FormatValues(
+    const std::vector<Scalar*>& values) const {
+  std::vector<std::string> 
segments(static_cast<size_t>(schema_->num_fields()));
+
+  for (int i = 0; i < schema_->num_fields(); ++i) {
+    if (values[i] != nullptr) {
+      segments[i] = values[i]->ToString();
+      continue;
+    }
+
+    if (auto illegal_index = NextValid(values, i)) {
+      // XXX maybe we should just ignore keys provided after the first absent 
one?
+      return Status::Invalid("No partition key for ", 
schema_->field(i)->name(),
+                             " but subsequent a key was provided subsequently 
for ",
+                             schema_->field(*illegal_index)->name(), ".");
+    }
+
+    // if all subsequent keys are absent we'll just print the available keys
+    break;
   }
-  return key.value;
+
+  return fs::internal::JoinAbstractPath(std::move(segments));
 }
 
 class KeyValuePartitioningInspectImpl {
@@ -522,10 +540,6 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl {
     arrow::internal::Permute(permutation, &source_fragments_);
     arrow::internal::Permute(permutation, &right_hand_sides_);
 
-    // the basename of out.paths[i] is stored in segments[i] (full paths will 
be assembled
-    // after segments is complete)
-    std::vector<std::string> segments;
-
     // out.paths[parents[i]] is the parent directory of out.paths[i]
     std::vector<int> parents;
 
@@ -533,6 +547,10 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl {
     // partition expression corresponding to field_i
     Indices current_right_hand_sides(num_fields(), -1);
 
+    // current_partition_expressions[field_i] is the current partition 
expression
+    // corresponding to field_i
+    ExpressionVector current_partition_expressions(num_fields());
+
     // out.paths[current_parents[field_i]] is the current ancestor directory 
corresponding
     // to field_i
     Indices current_parents(num_fields() + 1, -1);
@@ -552,40 +570,33 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl {
         current_parents[field_i + 1] = static_cast<int>(parents.size());
         parents.push_back(current_parents[field_i]);
 
-        auto partition_expression = PartitionExpression(fragment_i, field_i);
+        current_partition_expressions.resize(field_i + 1);
+        current_partition_expressions[field_i] = 
PartitionExpression(fragment_i, field_i);
+        auto partition_expression = and_(current_partition_expressions);
 
         // format segment for partition_expression
-        ARROW_ASSIGN_OR_RAISE(auto segment,
-                              out.partitioning->Format(*partition_expression, 
field_i));
-        segment.push_back(fs::internal::kSep);
-        segments.push_back(std::move(segment));
+        ARROW_ASSIGN_OR_RAISE(auto path, 
out.partitioning->Format(*partition_expression));
+        out.paths.push_back(std::move(path));
 
         // store partition_expression for use in the written Dataset
         out.fragment_or_partition_expressions.emplace_back(
-            std::move(partition_expression));
+            current_partition_expressions[field_i]);
 
         current_right_hand_sides[field_i] = 
right_hand_sides_[fragment_i][field_i];
       }
 
       // push a fragment (not attempting to give files meaningful names)
-      parents.push_back(current_parents[field_i]);
-      segments.emplace_back(Guid() + "_" + std::to_string(fragment_i));
+      std::string basename = Guid() + "_" + std::to_string(fragment_i);
+      int parent_i = current_parents[field_i];
+      parents.push_back(parent_i);
+      out.paths.push_back(fs::internal::JoinAbstractPath(
+          std::vector<std::string>{out.paths[parent_i], std::move(basename)}));
 
       // store a fragment for writing to disk
       out.fragment_or_partition_expressions.emplace_back(
           std::move(source_fragments_[fragment_i]));
     }
 
-    // render paths from segments
-    for (size_t i = 0; i < segments.size(); ++i) {
-      if (parents[i] == -1) {
-        out.paths.push_back(segments[i]);
-        continue;
-      }
-
-      out.paths.push_back(out.paths[parents[i]] + segments[i]);
-    }
-
     return out;
   }
 
@@ -643,8 +654,38 @@ util::optional<KeyValuePartitioning::Key> 
HivePartitioning::ParseKey(
   return Key{segment.substr(0, name_end), segment.substr(name_end + 1)};
 }
 
-Result<std::string> HivePartitioning::FormatKey(const Key& key, int i) const {
-  return key.name + "=" + key.value;
+std::vector<KeyValuePartitioning::Key> HivePartitioning::ParseKeys(
+    const std::string& path) const {
+  std::vector<Key> keys;
+
+  for (const auto& segment : fs::internal::SplitAbstractPath(path)) {
+    if (auto key = ParseKey(segment)) {
+      keys.push_back(std::move(*key));
+    }
+  }
+
+  return keys;
+}
+
+Result<std::string> HivePartitioning::FormatValues(
+    const std::vector<Scalar*>& values) const {
+  std::vector<std::string> 
segments(static_cast<size_t>(schema_->num_fields()));
+
+  for (int i = 0; i < schema_->num_fields(); ++i) {
+    const std::string& name = schema_->field(i)->name();
+
+    if (values[i] == nullptr) {
+      if (!NextValid(values, i)) break;
+
+      // If no key is available just provide a placeholder segment to maintain 
the
+      // field_index <-> path nesting relation
+      segments[i] = name;
+    } else {
+      segments[i] = name + "=" + values[i]->ToString();
+    }
+  }
+
+  return fs::internal::JoinAbstractPath(std::move(segments));
 }
 
 class HivePartitioningFactory : public PartitioningFactory {
diff --git a/cpp/src/arrow/dataset/partition.h 
b/cpp/src/arrow/dataset/partition.h
index c93c19b..ea5828ec 100644
--- a/cpp/src/arrow/dataset/partition.h
+++ b/cpp/src/arrow/dataset/partition.h
@@ -59,21 +59,10 @@ class ARROW_DS_EXPORT Partitioning {
   /// \brief The name identifying the kind of partitioning
   virtual std::string type_name() const = 0;
 
-  /// \brief Parse a path segment into a partition expression
-  ///
-  /// \param[in] segment the path segment to parse
-  /// \param[in] i the index of segment within a path
-  /// \return the parsed expression
-  virtual Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
-                                                    int i) const = 0;
-
-  virtual Result<std::string> Format(const Expression& expr, int i) const {
-    // FIXME(bkietz) make this pure virtual
-    return Status::NotImplemented("formatting paths from ", type_name(), " 
Partitioning");
-  }
-
   /// \brief Parse a path into a partition expression
-  Result<std::shared_ptr<Expression>> Parse(const std::string& path) const;
+  virtual Result<std::shared_ptr<Expression>> Parse(const std::string& path) 
const = 0;
+
+  virtual Result<std::string> Format(const Expression& expr) const = 0;
 
   /// \brief A default Partitioning which always yields scalar(true)
   static std::shared_ptr<Partitioning> Default();
@@ -126,40 +115,6 @@ class ARROW_DS_EXPORT PartitioningFactory {
                                           FragmentIterator fragments);
 };
 
-/// \brief Subclass for representing the default, a partitioning that returns
-/// the idempotent "true" partition.
-class DefaultPartitioning : public Partitioning {
- public:
-  DefaultPartitioning() : Partitioning(::arrow::schema({})) {}
-
-  std::string type_name() const override { return "default"; }
-
-  Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
-                                            int i) const override {
-    return scalar(true);
-  }
-};
-
-/// \brief Subclass for looking up partition information from a dictionary
-/// mapping segments to expressions provided on construction.
-class ARROW_DS_EXPORT SegmentDictionaryPartitioning : public Partitioning {
- public:
-  SegmentDictionaryPartitioning(
-      std::shared_ptr<Schema> schema,
-      std::vector<std::unordered_map<std::string, std::shared_ptr<Expression>>>
-          dictionaries)
-      : Partitioning(std::move(schema)), 
dictionaries_(std::move(dictionaries)) {}
-
-  std::string type_name() const override { return "segment_dictionary"; }
-
-  /// Return dictionaries_[i][segment] or scalar(true)
-  Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
-                                            int i) const override;
-
- protected:
-  std::vector<std::unordered_map<std::string, std::shared_ptr<Expression>>> 
dictionaries_;
-};
-
 /// \brief Subclass for the common case of a partitioning which yields an 
equality
 /// expression for each segment
 class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
@@ -181,21 +136,9 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public 
Partitioning {
   static Status SetDefaultValuesFromKeys(const Expression& expr,
                                          RecordBatchProjector* projector);
 
-  /// Convert a Key to a full expression.
-  /// If the field referenced in key is absent from the schema will be ignored.
-  static Result<std::shared_ptr<Expression>> ConvertKey(const Key& key,
-                                                        const Schema& schema,
-                                                        const ArrayVector& 
dictionaries);
-
-  /// Extract a partition key from a path segment.
-  virtual util::optional<Key> ParseKey(const std::string& segment, int i) 
const = 0;
-
-  virtual Result<std::string> FormatKey(const Key& expr, int i) const = 0;
-
-  Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
-                                            int i) const override;
+  Result<std::shared_ptr<Expression>> Parse(const std::string& path) const 
override;
 
-  Result<std::string> Format(const Expression& expr, int i) const override;
+  Result<std::string> Format(const Expression& expr) const override;
 
  protected:
   KeyValuePartitioning(std::shared_ptr<Schema> schema, ArrayVector 
dictionaries)
@@ -205,6 +148,13 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public 
Partitioning {
     }
   }
 
+  virtual std::vector<Key> ParseKeys(const std::string& path) const = 0;
+
+  virtual Result<std::string> FormatValues(const std::vector<Scalar*>& values) 
const = 0;
+
+  /// Convert a Key to a full expression.
+  Result<std::shared_ptr<Expression>> ConvertKey(const Key& key) const;
+
   ArrayVector dictionaries_;
 };
 
@@ -224,12 +174,13 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public 
KeyValuePartitioning {
 
   std::string type_name() const override { return "schema"; }
 
-  util::optional<Key> ParseKey(const std::string& segment, int i) const 
override;
-
-  Result<std::string> FormatKey(const Key& key, int i) const override;
-
   static std::shared_ptr<PartitioningFactory> MakeFactory(
       std::vector<std::string> field_names, PartitioningFactoryOptions = {});
+
+ private:
+  std::vector<Key> ParseKeys(const std::string& path) const override;
+
+  Result<std::string> FormatValues(const std::vector<Scalar*>& values) const 
override;
 };
 
 /// \brief Multi-level, directory based partitioning
@@ -250,36 +201,48 @@ class ARROW_DS_EXPORT HivePartitioning : public 
KeyValuePartitioning {
 
   std::string type_name() const override { return "hive"; }
 
-  util::optional<Key> ParseKey(const std::string& segment, int i) const 
override {
-    return ParseKey(segment);
-  }
-
-  Result<std::string> FormatKey(const Key& key, int i) const override;
-
   static util::optional<Key> ParseKey(const std::string& segment);
 
   static std::shared_ptr<PartitioningFactory> MakeFactory(
       PartitioningFactoryOptions = {});
+
+ private:
+  std::vector<Key> ParseKeys(const std::string& path) const override;
+
+  Result<std::string> FormatValues(const std::vector<Scalar*>& values) const 
override;
 };
 
 /// \brief Implementation provided by lambda or other callable
 class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning {
  public:
-  explicit FunctionPartitioning(
-      std::shared_ptr<Schema> schema,
-      std::function<Result<std::shared_ptr<Expression>>(const std::string&, 
int)> impl,
-      std::string name = "function")
-      : Partitioning(std::move(schema)), impl_(std::move(impl)), 
name_(std::move(name)) {}
+  using ParseImpl =
+      std::function<Result<std::shared_ptr<Expression>>(const std::string&)>;
+
+  using FormatImpl = std::function<Result<std::string>(const Expression&)>;
+
+  FunctionPartitioning(std::shared_ptr<Schema> schema, ParseImpl parse_impl,
+                       FormatImpl format_impl = NULLPTR, std::string name = 
"function")
+      : Partitioning(std::move(schema)),
+        parse_impl_(std::move(parse_impl)),
+        format_impl_(std::move(format_impl)),
+        name_(std::move(name)) {}
 
   std::string type_name() const override { return name_; }
 
-  Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
-                                            int i) const override {
-    return impl_(segment, i);
+  Result<std::shared_ptr<Expression>> Parse(const std::string& path) const 
override {
+    return parse_impl_(path);
+  }
+
+  Result<std::string> Format(const Expression& expr) const override {
+    if (format_impl_) {
+      return format_impl_(expr);
+    }
+    return Status::NotImplemented("formatting paths from ", type_name(), " 
Partitioning");
   }
 
  private:
-  std::function<Result<std::shared_ptr<Expression>>(const std::string&, int)> 
impl_;
+  ParseImpl parse_impl_;
+  FormatImpl format_impl_;
   std::string name_;
 };
 
diff --git a/cpp/src/arrow/dataset/partition_test.cc 
b/cpp/src/arrow/dataset/partition_test.cc
index 7e7c310..7636529 100644
--- a/cpp/src/arrow/dataset/partition_test.cc
+++ b/cpp/src/arrow/dataset/partition_test.cc
@@ -43,16 +43,27 @@ using E = TestExpression;
 class TestPartitioning : public ::testing::Test {
  public:
   void AssertParseError(const std::string& path) {
-    ASSERT_RAISES(Invalid, partitioning_->Parse(path).status());
+    ASSERT_RAISES(Invalid, partitioning_->Parse(path));
   }
 
-  void AssertParse(const std::string& path, std::shared_ptr<Expression> 
expected) {
+  void AssertParse(const std::string& path, E expected) {
     ASSERT_OK_AND_ASSIGN(auto parsed, partitioning_->Parse(path));
-    ASSERT_EQ(E{parsed}, E{expected});
+    ASSERT_EQ(E{parsed}, expected);
   }
 
-  void AssertParse(const std::string& path, const Expression& expected) {
-    AssertParse(path, expected.Copy());
+  template <StatusCode code = StatusCode::Invalid>
+  void AssertFormatError(E expr) {
+    ASSERT_EQ(partitioning_->Format(*expr.expression).status().code(), code);
+  }
+
+  void AssertFormat(E expr, const std::string& expected) {
+    ASSERT_OK_AND_ASSIGN(auto formatted, 
partitioning_->Format(*expr.expression));
+    ASSERT_EQ(formatted, expected);
+
+    // ensure the formatted path round trips the relevant components of the 
partition
+    // expression: roundtripped should be a subset of expr
+    ASSERT_OK_AND_ASSIGN(auto roundtripped, partitioning_->Parse(formatted));
+    ASSERT_EQ(E{roundtripped->Assume(*expr.expression)}, E{scalar(true)});
   }
 
   void AssertInspect(const std::vector<std::string>& paths,
@@ -83,33 +94,6 @@ class TestPartitioning : public ::testing::Test {
   std::shared_ptr<PartitioningFactory> factory_;
 };
 
-TEST_F(TestPartitioning, SegmentDictionary) {
-  using Dict = std::unordered_map<std::string, std::shared_ptr<Expression>>;
-  Dict alpha_dict, beta_dict;
-  auto add_expr = [](const std::string& segment, const Expression& expr, Dict* 
dict) {
-    dict->emplace(segment, expr.Copy());
-  };
-
-  add_expr("zero", "alpha"_ == 0, &alpha_dict);
-  add_expr("one", "alpha"_ == 1, &alpha_dict);
-  add_expr("more", "alpha"_ >= 2, &alpha_dict);
-
-  add_expr("...", "beta"_ == "uh", &beta_dict);
-  add_expr("?", "beta"_ == "what", &beta_dict);
-  add_expr("!", "beta"_ == "OH", &beta_dict);
-
-  partitioning_.reset(new SegmentDictionaryPartitioning(
-      schema({field("alpha", int32()), field("beta", utf8())}), {alpha_dict, 
beta_dict}));
-
-  AssertParse("/one/?", "alpha"_ == int32_t(1) and "beta"_ == "what");
-  AssertParse("/one/?/---", "alpha"_ == int32_t(1) and "beta"_ == "what");
-  AssertParse("/more", "alpha"_ >= int32_t(2));      // missing second segment
-  AssertParse("/---/---", scalar(true));             // unrecognized segments
-  AssertParse("/---/!", "beta"_ == "OH");            // unrecognized first 
segment
-  AssertParse("/zero/---", "alpha"_ == int32_t(0));  // unrecognized second 
segment
-  AssertParse("", scalar(true));                     // no segments to parse
-}
-
 TEST_F(TestPartitioning, DirectoryPartitioning) {
   partitioning_ = std::make_shared<DirectoryPartitioning>(
       schema({field("alpha", int32()), field("beta", utf8())}));
@@ -127,6 +111,21 @@ TEST_F(TestPartitioning, DirectoryPartitioning) {
   AssertParse("/0/foo/ignored=2341", "alpha"_ == int32_t(0) and "beta"_ == 
"foo");
 }
 
+TEST_F(TestPartitioning, DirectoryPartitioningFormat) {
+  partitioning_ = std::make_shared<DirectoryPartitioning>(
+      schema({field("alpha", int32()), field("beta", utf8())}));
+
+  AssertFormat("alpha"_ == int32_t(0) and "beta"_ == "hello", "0/hello");
+  AssertFormat("beta"_ == "hello" and "alpha"_ == int32_t(0), "0/hello");
+  AssertFormat("alpha"_ == int32_t(0), "0");
+  AssertFormatError("beta"_ == "hello");
+  AssertFormat(scalar(true), "");
+
+  AssertFormatError<StatusCode::TypeError>("alpha"_ == 0.0 and "beta"_ == 
"hello");
+  AssertFormat("gamma"_ == "yo" and "alpha"_ == int32_t(0) and "beta"_ == 
"hello",
+               "0/hello");
+}
+
 TEST_F(TestPartitioning, DiscoverSchema) {
   factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"});
 
@@ -213,6 +212,21 @@ TEST_F(TestPartitioning, HivePartitioning) {
   AssertParseError("/alpha=0.0/beta=3.25");  // conversion of "0.0" to int32 
fails
 }
 
+TEST_F(TestPartitioning, HivePartitioningFormat) {
+  partitioning_ = std::make_shared<HivePartitioning>(
+      schema({field("alpha", int32()), field("beta", float32())}));
+
+  AssertFormat("alpha"_ == int32_t(0) and "beta"_ == 3.25f, 
"alpha=0/beta=3.25");
+  AssertFormat("beta"_ == 3.25f and "alpha"_ == int32_t(0), 
"alpha=0/beta=3.25");
+  AssertFormat("alpha"_ == int32_t(0), "alpha=0");
+  AssertFormat("beta"_ == 3.25f, "alpha/beta=3.25");
+  AssertFormat(scalar(true), "");
+
+  AssertFormatError<StatusCode::TypeError>("alpha"_ == "yo" and "beta"_ == 
3.25f);
+  AssertFormat("gamma"_ == "yo" and "alpha"_ == int32_t(0) and "beta"_ == 
3.25f,
+               "alpha=0/beta=3.25");
+}
+
 TEST_F(TestPartitioning, DiscoverHiveSchema) {
   factory_ = HivePartitioning::MakeFactory();
 
@@ -283,90 +297,113 @@ TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) {
 }
 
 TEST_F(TestPartitioning, EtlThenHive) {
-  DirectoryPartitioning etl_part(schema({field("year", int16()), 
field("month", int8()),
-                                         field("day", int8()), field("hour", 
int8())}));
-  HivePartitioning alphabeta_part(
-      schema({field("alpha", int32()), field("beta", float32())}));
+  FieldVector etl_fields{field("year", int16()), field("month", int8()),
+                         field("day", int8()), field("hour", int8())};
+  DirectoryPartitioning etl_part(schema(etl_fields));
+
+  FieldVector alphabeta_fields{field("alpha", int32()), field("beta", 
float32())};
+  HivePartitioning alphabeta_part(schema(alphabeta_fields));
 
   partitioning_ = std::make_shared<FunctionPartitioning>(
       schema({field("year", int16()), field("month", int8()), field("day", 
int8()),
               field("hour", int8()), field("alpha", int32()), field("beta", 
float32())}),
-      [&](const std::string& segment, int i) -> 
Result<std::shared_ptr<Expression>> {
-        if (i < etl_part.schema()->num_fields()) {
-          return etl_part.Parse(segment, i);
+      [&](const std::string& path) -> Result<std::shared_ptr<Expression>> {
+        auto segments = fs::internal::SplitAbstractPath(path);
+        if (segments.size() < etl_fields.size() + alphabeta_fields.size()) {
+          return Status::Invalid("path ", path, " can't be parsed");
         }
 
-        i -= etl_part.schema()->num_fields();
-        return alphabeta_part.Parse(segment, i);
+        auto etl_segments_end = segments.begin() + etl_fields.size();
+        auto etl_path =
+            fs::internal::JoinAbstractPath(segments.begin(), etl_segments_end);
+        ARROW_ASSIGN_OR_RAISE(auto etl_expr, etl_part.Parse(etl_path));
+
+        auto alphabeta_segments_end = etl_segments_end + 
alphabeta_fields.size();
+        auto alphabeta_path =
+            fs::internal::JoinAbstractPath(etl_segments_end, 
alphabeta_segments_end);
+        ARROW_ASSIGN_OR_RAISE(auto alphabeta_expr, 
alphabeta_part.Parse(alphabeta_path));
+
+        return and_(etl_expr, alphabeta_expr);
       });
 
   AssertParse("/1999/12/31/00/alpha=0/beta=3.25",
               "year"_ == int16_t(1999) and "month"_ == int8_t(12) and
                   "day"_ == int8_t(31) and "hour"_ == int8_t(0) and
-                  "alpha"_ == int32_t(0) and "beta"_ == 3.25f);
+                  ("alpha"_ == int32_t(0) and "beta"_ == 3.25f));
 
   AssertParseError("/20X6/03/21/05/alpha=0/beta=3.25");
-}
+}  // namespace dataset
 
 TEST_F(TestPartitioning, Set) {
+  auto ints = [](std::vector<int32_t> ints) {
+    std::shared_ptr<Array> out;
+    ArrayFromVector<Int32Type>(ints, &out);
+    return out;
+  };
+
   // An adhoc partitioning which parses segments like "/x in [1 4 5]"
   // into ("x"_ == 1 or "x"_ == 4 or "x"_ == 5)
   partitioning_ = std::make_shared<FunctionPartitioning>(
       schema({field("x", int32())}),
-      [](const std::string& segment, int) -> 
Result<std::shared_ptr<Expression>> {
-        std::smatch matches;
-
-        static std::regex re("^x in \\[(.*)\\]$");
-        if (!std::regex_match(segment, matches, re) || matches.size() != 2) {
-          return Status::Invalid("regex failed to parse");
-        }
-
+      [&](const std::string& path) -> Result<std::shared_ptr<Expression>> {
         ExpressionVector subexpressions;
-        std::string element;
-        std::istringstream elements(matches[1]);
-        while (elements >> element) {
-          ARROW_ASSIGN_OR_RAISE(auto s, Scalar::Parse(int32(), element));
-          subexpressions.push_back(equal(field_ref("x"), scalar(s)));
+        for (auto segment : fs::internal::SplitAbstractPath(path)) {
+          std::smatch matches;
+
+          static std::regex re(R"(^(\S+) in \[(.*)\]$)");
+          if (!std::regex_match(segment, matches, re) || matches.size() != 3) {
+            return Status::Invalid("regex failed to parse");
+          }
+
+          std::vector<int32_t> set;
+          std::istringstream elements(matches[2]);
+          for (std::string element; elements >> element;) {
+            ARROW_ASSIGN_OR_RAISE(auto s, Scalar::Parse(int32(), element));
+            set.push_back(checked_cast<const Int32Scalar&>(*s).value);
+          }
+
+          
subexpressions.push_back(field_ref(matches[1])->In(ints(set)).Copy());
         }
-
-        return or_(std::move(subexpressions));
+        return and_(std::move(subexpressions));
       });
 
-  AssertParse("/x in [1]", "x"_ == 1);
-  AssertParse("/x in [1 4 5]", "x"_ == 1 or "x"_ == 4 or "x"_ == 5);
-  AssertParse("/x in []", scalar(false));
+  AssertParse("/x in [1]", "x"_.In(ints({1})));
+  AssertParse("/x in [1 4 5]", "x"_.In(ints({1, 4, 5})));
+  AssertParse("/x in []", "x"_.In(ints({})));
 }
 
 // An adhoc partitioning which parses segments like "/x=[-3.25, 0.0)"
 // into ("x"_ >= -3.25 and "x" < 0.0)
-class RangePartitioning : public HivePartitioning {
+class RangePartitioning : public Partitioning {
  public:
-  using HivePartitioning::HivePartitioning;
+  explicit RangePartitioning(std::shared_ptr<Schema> s) : 
Partitioning(std::move(s)) {}
 
   std::string type_name() const override { return "range"; }
 
-  Result<std::shared_ptr<Expression>> Parse(const std::string& segment,
-                                            int i) const override {
+  Result<std::shared_ptr<Expression>> Parse(const std::string& path) const 
override {
     ExpressionVector ranges;
-    auto key = ParseKey(segment, i);
-    if (!key) {
-      return Status::Invalid("can't parse '", segment, "' as a range");
-    }
 
-    std::smatch matches;
-    RETURN_NOT_OK(DoRegex(key->value, &matches));
+    for (auto segment : fs::internal::SplitAbstractPath(path)) {
+      auto key = HivePartitioning::ParseKey(segment);
+      if (!key) {
+        return Status::Invalid("can't parse '", segment, "' as a range");
+      }
+
+      std::smatch matches;
+      RETURN_NOT_OK(DoRegex(key->value, &matches));
 
-    auto& min_cmp = matches[1] == "[" ? greater_equal : greater;
-    std::string min_repr = matches[2];
-    std::string max_repr = matches[3];
-    auto& max_cmp = matches[4] == "]" ? less_equal : less;
+      auto& min_cmp = matches[1] == "[" ? greater_equal : greater;
+      std::string min_repr = matches[2];
+      std::string max_repr = matches[3];
+      auto& max_cmp = matches[4] == "]" ? less_equal : less;
 
-    const auto& type = schema_->GetFieldByName(key->name)->type();
-    ARROW_ASSIGN_OR_RAISE(auto min, Scalar::Parse(type, min_repr));
-    ARROW_ASSIGN_OR_RAISE(auto max, Scalar::Parse(type, max_repr));
+      const auto& type = schema_->GetFieldByName(key->name)->type();
+      ARROW_ASSIGN_OR_RAISE(auto min, Scalar::Parse(type, min_repr));
+      ARROW_ASSIGN_OR_RAISE(auto max, Scalar::Parse(type, max_repr));
 
-    ranges.push_back(and_(min_cmp(field_ref(key->name), scalar(min)),
-                          max_cmp(field_ref(key->name), scalar(max))));
+      ranges.push_back(and_(min_cmp(field_ref(key->name), scalar(min)),
+                            max_cmp(field_ref(key->name), scalar(max))));
+    }
     return and_(ranges);
   }
 
@@ -386,6 +423,8 @@ class RangePartitioning : public HivePartitioning {
 
     return Status::OK();
   }
+
+  Result<std::string> Format(const Expression&) const override { return ""; }
 };
 
 TEST_F(TestPartitioning, Range) {
@@ -456,7 +495,7 @@ class TestPartitioningWritePlan : public ::testing::Test {
               static_cast<int>(std::find(fragments.begin(), fragments.end(), 
fragment) -
                                fragments.begin());
           auto path = 
fs::internal::GetAbstractPathParent(actual_plan.paths[i]).first;
-          dirs_[path + "/"].fragments.push_back(fragment_index);
+          dirs_[path].fragments.push_back(fragment_index);
         } else {
           auto partition_expression = op.partition_expr();
           dirs_[actual_plan.paths[i]].partition_expression = 
partition_expression;
@@ -526,15 +565,15 @@ TEST_F(TestPartitioningWritePlan, SingleDirectory) {
 
   MakeWritePlan("a"_ == 42, "a"_ == 99, "a"_ == 101);
   AssertPlanIs(ExpectedWritePlan()
-                   .Dir("42/", "a"_ == 42, {0})
-                   .Dir("99/", "a"_ == 99, {1})
-                   .Dir("101/", "a"_ == 101, {2}));
+                   .Dir("42", "a"_ == 42, {0})
+                   .Dir("99", "a"_ == 99, {1})
+                   .Dir("101", "a"_ == 101, {2}));
 
   MakeWritePlan("a"_ == 42, "a"_ == 99, "a"_ == 99, "a"_ == 101, "a"_ == 99);
   AssertPlanIs(ExpectedWritePlan()
-                   .Dir("42/", "a"_ == 42, {0})
-                   .Dir("99/", "a"_ == 99, {1, 2, 4})
-                   .Dir("101/", "a"_ == 101, {3}));
+                   .Dir("42", "a"_ == 42, {0})
+                   .Dir("99", "a"_ == 99, {1, 2, 4})
+                   .Dir("101", "a"_ == 101, {3}));
 }
 
 TEST_F(TestPartitioningWritePlan, NestedDirectories) {
@@ -544,12 +583,12 @@ TEST_F(TestPartitioningWritePlan, NestedDirectories) {
                 "a"_ == 99 and "b"_ == "hello", "a"_ == 99 and "b"_ == 
"world");
 
   AssertPlanIs(ExpectedWritePlan()
-                   .Dir("42/", "a"_ == 42, {})
-                   .Dir("42/hello/", "b"_ == "hello", {0})
-                   .Dir("42/world/", "b"_ == "world", {1})
-                   .Dir("99/", "a"_ == 99, {})
-                   .Dir("99/hello/", "b"_ == "hello", {2})
-                   .Dir("99/world/", "b"_ == "world", {3}));
+                   .Dir("42", "a"_ == 42, {})
+                   .Dir("42/hello", "b"_ == "hello", {0})
+                   .Dir("42/world", "b"_ == "world", {1})
+                   .Dir("99", "a"_ == 99, {})
+                   .Dir("99/hello", "b"_ == "hello", {2})
+                   .Dir("99/world", "b"_ == "world", {3}));
 }
 
 TEST_F(TestPartitioningWritePlan, Errors) {
@@ -558,9 +597,9 @@ TEST_F(TestPartitioningWritePlan, Errors) {
       Invalid, testing::HasSubstr("no partition expression for field 'a'"),
       MakeWritePlanError("a"_ == 42, scalar(true), "a"_ == 101));
 
-  EXPECT_RAISES_WITH_MESSAGE_THAT(TypeError,
-                                  testing::HasSubstr("expected RHS to have 
type int32"),
-                                  MakeWritePlanError("a"_ == 42, "a"_ == 
"hello"));
+  EXPECT_RAISES_WITH_MESSAGE_THAT(
+      TypeError, testing::HasSubstr("scalar hello (of type string) is 
invalid"),
+      MakeWritePlanError("a"_ == 42, "a"_ == "hello"));
 
   factory_ = DirectoryPartitioning::MakeFactory({"a", "b"});
   EXPECT_RAISES_WITH_MESSAGE_THAT(
diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc 
b/cpp/src/arrow/filesystem/filesystem_test.cc
index bce70d7..b679f8b 100644
--- a/cpp/src/arrow/filesystem/filesystem_test.cc
+++ b/cpp/src/arrow/filesystem/filesystem_test.cc
@@ -139,7 +139,7 @@ TEST(PathUtil, ConcatAbstractPath) {
 }
 
 TEST(PathUtil, JoinAbstractPath) {
-  std::vector<std::string> parts = {"abc", "def", "ghi", "jkl"};
+  std::vector<std::string> parts = {"abc", "def", "ghi", "", "jkl"};
 
   ASSERT_EQ("abc/def/ghi/jkl", JoinAbstractPath(parts.begin(), parts.end()));
   ASSERT_EQ("def/ghi", JoinAbstractPath(parts.begin() + 1, parts.begin() + 3));
diff --git a/cpp/src/arrow/filesystem/path_util.h 
b/cpp/src/arrow/filesystem/path_util.h
index 4743053..46fa415 100644
--- a/cpp/src/arrow/filesystem/path_util.h
+++ b/cpp/src/arrow/filesystem/path_util.h
@@ -92,6 +92,8 @@ template <class StringIt>
 std::string JoinAbstractPath(StringIt it, StringIt end) {
   std::string path;
   for (; it != end; ++it) {
+    if (it->empty()) continue;
+
     if (!path.empty()) {
       path += kSep;
     }
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index d534d58..186daac 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -1171,15 +1171,19 @@ std::vector<FieldPath> FieldRef::FindAll(const 
FieldVector& fields) const {
 
       size_t size() const { return referents.size(); }
 
-      void Add(FieldPath prefix, const FieldPath& match, const FieldVector& 
fields) {
-        auto maybe_field = match.Get(fields);
+      void Add(const FieldPath& prefix, const FieldPath& suffix,
+               const FieldVector& fields) {
+        auto maybe_field = suffix.Get(fields);
         DCHECK_OK(maybe_field.status());
-
-        prefix.indices().resize(prefix.indices().size() + 
match.indices().size());
-        std::copy(match.indices().begin(), match.indices().end(),
-                  prefix.indices().end() - match.indices().size());
-        prefixes.push_back(std::move(prefix));
         referents.push_back(std::move(maybe_field).ValueOrDie());
+
+        std::vector<int> concatenated_indices(prefix.indices().size() +
+                                              suffix.indices().size());
+        auto it = concatenated_indices.begin();
+        for (auto path : {&prefix, &suffix}) {
+          it = std::copy(path->indices().begin(), path->indices().end(), it);
+        }
+        prefixes.emplace_back(std::move(concatenated_indices));
       }
     };
 
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 86d8a79..e2532d2 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -1363,11 +1363,14 @@ class ARROW_EXPORT FieldPath {
   size_t hash() const;
 
   explicit operator bool() const { return !indices_.empty(); }
+  bool operator!() const { return indices_.empty(); }
   bool operator==(const FieldPath& other) const { return indices() == 
other.indices(); }
-  bool operator!=(const FieldPath& other) const { return !(*this == other); }
+  bool operator!=(const FieldPath& other) const { return indices() != 
other.indices(); }
 
-  std::vector<int>& indices() { return indices_; }
   const std::vector<int>& indices() const { return indices_; }
+  int operator[](size_t i) const { return indices_[i]; }
+  std::vector<int>::const_iterator begin() const { return indices_.begin(); }
+  std::vector<int>::const_iterator end() const { return indices_.end(); }
 
   /// \brief Retrieve the referenced child Field from a Schema, Field, or 
DataType
   Result<std::shared_ptr<Field>> Get(const Schema& schema) const;

Reply via email to