This is an automated email from the ASF dual-hosted git repository.

npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3694794  ARROW-8296: [C++][Dataset] Add IpcFileWriteOptions
3694794 is described below

commit 3694794bdfd0677b95b8c95681e392512f1c9237
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Fri Oct 9 16:27:37 2020 -0700

    ARROW-8296: [C++][Dataset] Add IpcFileWriteOptions
    
    Closes #8389 from bkietz/8296-IpcFileWriteOptions
    
    Lead-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Sutou Kouhei <[email protected]>
    Co-authored-by: Neal Richardson <[email protected]>
    Signed-off-by: Neal Richardson <[email protected]>
---
 c_glib/arrow-glib/codec.cpp                 |  60 ++++++++++--
 c_glib/arrow-glib/codec.h                   |   7 ++
 c_glib/arrow-glib/codec.hpp                 |   4 +-
 c_glib/arrow-glib/input-stream.cpp          |   2 +-
 c_glib/arrow-glib/ipc-options.cpp           |  83 ++++++++--------
 c_glib/arrow-glib/output-stream.cpp         |   2 +-
 c_glib/test/test-codec.rb                   |  10 ++
 c_glib/test/test-write-options.rb           |  22 +----
 cpp/src/arrow/dataset/file_ipc.cc           |  14 ++-
 cpp/src/arrow/dataset/file_ipc.h            |   6 +-
 cpp/src/arrow/dataset/file_ipc_test.cc      |  31 ++++++
 cpp/src/arrow/dataset/scanner.cc            |   2 +-
 cpp/src/arrow/ipc/feather.cc                |   5 +-
 cpp/src/arrow/ipc/metadata_internal.cc      |   8 +-
 cpp/src/arrow/ipc/options.h                 |   3 +-
 cpp/src/arrow/ipc/read_write_test.cc        |   8 +-
 cpp/src/arrow/ipc/writer.cc                 |  17 ++--
 cpp/src/arrow/testing/gtest_util.h          |  12 +++
 cpp/src/arrow/util/compression.cc           | 144 ++++++++++++++--------------
 cpp/src/arrow/util/compression.h            |  20 +++-
 cpp/src/arrow/util/compression_brotli.cc    |  19 ++--
 cpp/src/arrow/util/compression_bz2.cc       |   4 +-
 cpp/src/arrow/util/compression_lz4.cc       |   8 +-
 cpp/src/arrow/util/compression_snappy.cc    |   4 +-
 cpp/src/arrow/util/compression_test.cc      |  38 ++++----
 cpp/src/arrow/util/compression_zlib.cc      |   4 +-
 cpp/src/arrow/util/compression_zstd.cc      |  19 ++--
 cpp/src/arrow/util/key_value_metadata.h     |   5 +-
 cpp/src/arrow/util/string.cc                |   8 ++
 cpp/src/arrow/util/string.h                 |   3 +
 cpp/src/parquet/printer.cc                  |   8 +-
 python/pyarrow/includes/libarrow.pxd        |   4 +-
 python/pyarrow/io.pxi                       |  19 ----
 python/pyarrow/ipc.pxi                      |   9 +-
 r/R/arrowExports.R                          |   8 ++
 r/R/dataset-format.R                        |  12 +++
 r/R/dataset-write.R                         |  11 ++-
 r/R/record-batch-writer.R                   |  12 ++-
 r/man/RecordBatchWriter.Rd                  |   2 +-
 r/man/write_dataset.Rd                      |  13 ++-
 r/src/arrowExports.cpp                      |  39 ++++++++
 r/src/dataset.cpp                           |  19 ++++
 r/tests/testthat/test-dataset.R             |  32 ++++++-
 r/tests/testthat/test-record-batch-reader.R |   2 +-
 44 files changed, 497 insertions(+), 265 deletions(-)

diff --git a/c_glib/arrow-glib/codec.cpp b/c_glib/arrow-glib/codec.cpp
index fdd61e7..33b3d1c 100644
--- a/c_glib/arrow-glib/codec.cpp
+++ b/c_glib/arrow-glib/codec.cpp
@@ -38,7 +38,7 @@ G_BEGIN_DECLS
  */
 
 typedef struct GArrowCodecPrivate_ {
-  arrow::util::Codec *codec;
+  std::shared_ptr<arrow::util::Codec> codec;
 } GArrowCodecPrivate;
 
 enum {
@@ -57,7 +57,7 @@ garrow_codec_finalize(GObject *object)
 {
   auto priv = GARROW_CODEC_GET_PRIVATE(object);
 
-  delete priv->codec;
+  priv->codec.~shared_ptr();
 
   G_OBJECT_CLASS(garrow_codec_parent_class)->finalize(object);
 }
@@ -72,7 +72,8 @@ garrow_codec_set_property(GObject *object,
 
   switch (prop_id) {
   case PROP_CODEC:
-    priv->codec = static_cast<arrow::util::Codec 
*>(g_value_get_pointer(value));
+    priv->codec =
+      *static_cast<std::shared_ptr<arrow::util::Codec> 
*>(g_value_get_pointer(value));
     break;
   default:
     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
@@ -96,6 +97,8 @@ garrow_codec_get_property(GObject *object,
 static void
 garrow_codec_init(GArrowCodec *object)
 {
+  auto priv = GARROW_CODEC_GET_PRIVATE(object);
+  new(&priv->codec) std::shared_ptr<arrow::util::Codec>;
 }
 
 static void
@@ -111,7 +114,7 @@ garrow_codec_class_init(GArrowCodecClass *klass)
 
   spec = g_param_spec_pointer("codec",
                               "Codec",
-                              "The raw arrow::util::Codec *",
+                              "The raw std::shared_ptr<arrow::util::Codec> *",
                               static_cast<GParamFlags>(G_PARAM_WRITABLE |
                                                        
G_PARAM_CONSTRUCT_ONLY));
   g_object_class_install_property(gobject_class, PROP_CODEC, spec);
@@ -133,7 +136,9 @@ garrow_codec_new(GArrowCompressionType type,
   auto arrow_type = garrow_compression_type_to_raw(type);
   auto arrow_codec = arrow::util::Codec::Create(arrow_type);
   if (garrow::check(error, arrow_codec, "[codec][new]")) {
-    return garrow_codec_new_raw(arrow_codec.ValueOrDie().release());
+    std::shared_ptr<arrow::util::Codec> arrow_codec_shared =
+      std::move(*arrow_codec);
+    return garrow_codec_new_raw(&arrow_codec_shared);
   } else {
     return NULL;
   }
@@ -151,7 +156,46 @@ const gchar *
 garrow_codec_get_name(GArrowCodec *codec)
 {
   auto arrow_codec = garrow_codec_get_raw(codec);
-  return arrow_codec->name();
+  if (!arrow_codec) {
+    return NULL;
+  }
+  return arrow_codec->name().c_str();
+}
+
+/**
+ * garrow_codec_get_compression_type:
+ * @codec: A #GArrowCodec.
+ *
+ * Returns: The compression type of the codec.
+ *
+ * Since: 2.0.0
+ */
+GArrowCompressionType
+garrow_codec_get_compression_type(GArrowCodec *codec)
+{
+  auto arrow_codec = garrow_codec_get_raw(codec);
+  if (!arrow_codec) {
+    return GARROW_COMPRESSION_TYPE_UNCOMPRESSED;
+  }
+  return garrow_compression_type_from_raw(arrow_codec->compression_type());
+}
+
+/**
+ * garrow_codec_get_compression_level:
+ * @codec: A #GArrowCodec.
+ *
+ * Returns: The compression level of the codec.
+ *
+ * Since: 2.0.0
+ */
+gint
+garrow_codec_get_compression_level(GArrowCodec *codec)
+{
+  auto arrow_codec = garrow_codec_get_raw(codec);
+  if (!arrow_codec) {
+    return arrow::util::Codec::UseDefaultCompressionLevel();
+  }
+  return arrow_codec->compression_level();
 }
 
 G_END_DECLS
@@ -207,7 +251,7 @@ garrow_compression_type_to_raw(GArrowCompressionType type)
 }
 
 GArrowCodec *
-garrow_codec_new_raw(arrow::util::Codec *arrow_codec)
+garrow_codec_new_raw(std::shared_ptr<arrow::util::Codec> *arrow_codec)
 {
   auto codec = GARROW_CODEC(g_object_new(GARROW_TYPE_CODEC,
                                          "codec", arrow_codec,
@@ -215,7 +259,7 @@ garrow_codec_new_raw(arrow::util::Codec *arrow_codec)
   return codec;
 }
 
-arrow::util::Codec *
+std::shared_ptr<arrow::util::Codec>
 garrow_codec_get_raw(GArrowCodec *codec)
 {
   auto priv = GARROW_CODEC_GET_PRIVATE(codec);
diff --git a/c_glib/arrow-glib/codec.h b/c_glib/arrow-glib/codec.h
index 5feab2b..6e177af 100644
--- a/c_glib/arrow-glib/codec.h
+++ b/c_glib/arrow-glib/codec.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <arrow-glib/gobject-type.h>
+#include <arrow-glib/version.h>
 
 G_BEGIN_DECLS
 
@@ -63,5 +64,11 @@ GArrowCodec *garrow_codec_new(GArrowCompressionType type,
                               GError **error);
 
 const gchar *garrow_codec_get_name(GArrowCodec *codec);
+GARROW_AVAILABLE_IN_2_0
+GArrowCompressionType
+garrow_codec_get_compression_type(GArrowCodec *codec);
+GARROW_AVAILABLE_IN_2_0
+gint
+garrow_codec_get_compression_level(GArrowCodec *codec);
 
 G_END_DECLS
diff --git a/c_glib/arrow-glib/codec.hpp b/c_glib/arrow-glib/codec.hpp
index 14c3ad7..f4cfaba 100644
--- a/c_glib/arrow-glib/codec.hpp
+++ b/c_glib/arrow-glib/codec.hpp
@@ -29,6 +29,6 @@ arrow::Compression::type
 garrow_compression_type_to_raw(GArrowCompressionType type);
 
 GArrowCodec *
-garrow_codec_new_raw(arrow::util::Codec *arrow_codec);
-arrow::util::Codec *
+garrow_codec_new_raw(std::shared_ptr<arrow::util::Codec> *arrow_codec);
+std::shared_ptr<arrow::util::Codec>
 garrow_codec_get_raw(GArrowCodec *codec);
diff --git a/c_glib/arrow-glib/input-stream.cpp 
b/c_glib/arrow-glib/input-stream.cpp
index 3751d41..84904b7 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -1132,7 +1132,7 @@ garrow_compressed_input_stream_new(GArrowCodec *codec,
                                    GArrowInputStream *raw,
                                    GError **error)
 {
-  auto arrow_codec = garrow_codec_get_raw(codec);
+  auto arrow_codec = garrow_codec_get_raw(codec).get();
   auto arrow_raw = garrow_input_stream_get_raw(raw);
   auto arrow_stream =
     arrow::io::CompressedInputStream::Make(arrow_codec, arrow_raw);
diff --git a/c_glib/arrow-glib/ipc-options.cpp 
b/c_glib/arrow-glib/ipc-options.cpp
index 1cddd25..b9b2c41 100644
--- a/c_glib/arrow-glib/ipc-options.cpp
+++ b/c_glib/arrow-glib/ipc-options.cpp
@@ -21,6 +21,7 @@
 #  include <config.h>
 #endif
 
+#include <arrow-glib/codec.hpp>
 #include <arrow-glib/enums.h>
 #include <arrow-glib/ipc-options.hpp>
 
@@ -242,6 +243,7 @@ garrow_read_options_set_included_fields(GArrowReadOptions 
*options,
 
 typedef struct GArrowWriteOptionsPrivate_ {
   arrow::ipc::IpcWriteOptions options;
+  GArrowCodec *codec;
 } GArrowWriteOptionsPrivate;
 
 enum {
@@ -249,8 +251,7 @@ enum {
   PROP_WRITE_OPTIONS_MAX_RECURSION_DEPTH,
   PROP_WRITE_OPTIONS_ALIGNMENT,
   PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT,
-  PROP_WRITE_OPTIONS_COMPRESSION,
-  PROP_WRITE_OPTIONS_COMPRESSION_LEVEL,
+  PROP_WRITE_OPTIONS_CODEC,
   PROP_WRITE_OPTIONS_USE_THREADS,
 };
 
@@ -264,6 +265,19 @@ G_DEFINE_TYPE_WITH_PRIVATE(GArrowWriteOptions,
       GARROW_WRITE_OPTIONS(obj)))
 
 static void
+garrow_write_options_dispose(GObject *object)
+{
+  auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object);
+
+  if (priv->codec) {
+    g_object_unref(priv->codec);
+    priv->codec = NULL;
+  }
+
+  G_OBJECT_CLASS(garrow_write_options_parent_class)->dispose(object);
+}
+
+static void
 garrow_write_options_finalize(GObject *object)
 {
   auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object);
@@ -294,12 +308,12 @@ garrow_write_options_set_property(GObject *object,
   case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT:
     priv->options.write_legacy_ipc_format = g_value_get_boolean(value);
     break;
-  case PROP_WRITE_OPTIONS_COMPRESSION:
-    priv->options.compression =
-      static_cast<arrow::Compression::type>(g_value_get_enum(value));
-    break;
-  case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL:
-    priv->options.compression_level = g_value_get_int(value);
+  case PROP_WRITE_OPTIONS_CODEC:
+    if (priv->codec) {
+      g_object_unref(priv->codec);
+    }
+    priv->codec = GARROW_CODEC(g_value_dup_object(value));
+    priv->options.codec = garrow_codec_get_raw(priv->codec);
     break;
   case PROP_WRITE_OPTIONS_USE_THREADS:
     priv->options.use_threads = g_value_get_boolean(value);
@@ -331,11 +345,8 @@ garrow_write_options_get_property(GObject *object,
   case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT:
     g_value_set_boolean(value, priv->options.write_legacy_ipc_format);
     break;
-  case PROP_WRITE_OPTIONS_COMPRESSION:
-    g_value_set_enum(value, priv->options.compression);
-    break;
-  case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL:
-    g_value_set_int(value, priv->options.compression_level);
+  case PROP_WRITE_OPTIONS_CODEC:
+    g_value_set_object(value, priv->codec);
     break;
   case PROP_WRITE_OPTIONS_USE_THREADS:
     g_value_set_boolean(value, priv->options.use_threads);
@@ -352,6 +363,11 @@ garrow_write_options_init(GArrowWriteOptions *object)
   auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object);
   new(&priv->options) arrow::ipc::IpcWriteOptions;
   priv->options = arrow::ipc::IpcWriteOptions::Defaults();
+  if (priv->options.codec) {
+    priv->codec = garrow_codec_new_raw(&(priv->options.codec));
+  } else {
+    priv->codec = NULL;
+  }
 }
 
 static void
@@ -359,6 +375,7 @@ garrow_write_options_class_init(GArrowWriteOptionsClass 
*klass)
 {
   auto gobject_class = G_OBJECT_CLASS(klass);
 
+  gobject_class->dispose      = garrow_write_options_dispose;
   gobject_class->finalize     = garrow_write_options_finalize;
   gobject_class->set_property = garrow_write_options_set_property;
   gobject_class->get_property = garrow_write_options_get_property;
@@ -441,42 +458,24 @@ garrow_write_options_class_init(GArrowWriteOptionsClass 
*klass)
                                   spec);
 
   /**
-   * GArrowWriteOptions:compression:
+   * GArrowWriteOptions:codec:
    *
    * Codec to use for compressing and decompressing record batch body
    * buffers. This is not part of the Arrow IPC protocol and only for
-   * internal use (e.g. Feather files). May only be LZ4_FRAME and
-   * ZSTD.
+   * internal use (e.g. Feather files).
    *
-   * Since: 1.0.0
-   */
-  spec = g_param_spec_enum("compression",
-                           "Compression",
-                           "Codec to use for "
-                           "compressing record batch body buffers.",
-                           GARROW_TYPE_COMPRESSION_TYPE,
-                           options.compression,
-                           static_cast<GParamFlags>(G_PARAM_READWRITE));
-  g_object_class_install_property(gobject_class,
-                                  PROP_WRITE_OPTIONS_COMPRESSION,
-                                  spec);
-
-  /**
-   * GArrowWriteOptions:compression-level:
-   *
-   * The level for compression.
+   * May only be UNCOMPRESSED, LZ4_FRAME and ZSTD.
    *
-   * Since: 1.0.0
+   * Since: 2.0.0
    */
-  spec = g_param_spec_int("compression-level",
-                          "Compression level",
-                          "The level for compression",
-                          G_MININT,
-                          G_MAXINT,
-                          options.compression_level,
-                          static_cast<GParamFlags>(G_PARAM_READWRITE));
+  spec = g_param_spec_object("codec",
+                             "Codec",
+                             "Codec to use for "
+                             "compressing record batch body buffers.",
+                             GARROW_TYPE_CODEC,
+                             static_cast<GParamFlags>(G_PARAM_READWRITE));
   g_object_class_install_property(gobject_class,
-                                  PROP_WRITE_OPTIONS_COMPRESSION_LEVEL,
+                                  PROP_WRITE_OPTIONS_CODEC,
                                   spec);
 
   /**
diff --git a/c_glib/arrow-glib/output-stream.cpp 
b/c_glib/arrow-glib/output-stream.cpp
index 2c3ccaf..1619bac 100644
--- a/c_glib/arrow-glib/output-stream.cpp
+++ b/c_glib/arrow-glib/output-stream.cpp
@@ -688,7 +688,7 @@ garrow_compressed_output_stream_new(GArrowCodec *codec,
                                     GArrowOutputStream *raw,
                                     GError **error)
 {
-  auto arrow_codec = garrow_codec_get_raw(codec);
+  auto arrow_codec = garrow_codec_get_raw(codec).get();
   auto arrow_raw = garrow_output_stream_get_raw(raw);
   auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec,
                                                               arrow_raw);
diff --git a/c_glib/test/test-codec.rb b/c_glib/test/test-codec.rb
index 6617815..a32ec4d 100644
--- a/c_glib/test/test-codec.rb
+++ b/c_glib/test/test-codec.rb
@@ -20,4 +20,14 @@ class TestCodec < Test::Unit::TestCase
     codec = Arrow::Codec.new(:gzip)
     assert_equal("gzip", codec.name)
   end
+
+  def test_compression_type
+    codec = Arrow::Codec.new(:gzip)
+    assert_equal(Arrow::CompressionType::GZIP, codec.compression_type)
+  end
+
+  def test_compression_level
+    codec = Arrow::Codec.new(:gzip)
+    assert_equal(9, codec.compression_level)
+  end
 end
diff --git a/c_glib/test/test-write-options.rb 
b/c_glib/test/test-write-options.rb
index d30b78b..c528ce6 100644
--- a/c_glib/test/test-write-options.rb
+++ b/c_glib/test/test-write-options.rb
@@ -73,27 +73,15 @@ class TestWriteOptions < Test::Unit::TestCase
     end
   end
 
-  sub_test_case("compression") do
+  sub_test_case("codec") do
     def test_default
-      assert_equal(Arrow::CompressionType::UNCOMPRESSED,
-                   @options.compression)
+      assert_nil(@options.codec)
     end
 
     def test_accessor
-      @options.compression = :zstd
-      assert_equal(Arrow::CompressionType::ZSTD,
-                   @options.compression)
-    end
-  end
-
-  sub_test_case("compression-level") do
-    def test_default
-      assert_equal(-(2 ** 31), @options.compression_level)
-    end
-
-    def test_accessor
-      @options.compression_level = 8
-      assert_equal(8, @options.compression_level)
+      @options.codec = Arrow::Codec.new(:zstd)
+      assert_equal("zstd",
+                   @options.codec.name)
     end
   end
 
diff --git a/cpp/src/arrow/dataset/file_ipc.cc 
b/cpp/src/arrow/dataset/file_ipc.cc
index e15de84..8bd0121 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -168,12 +168,12 @@ Result<ScanTaskIterator> 
IpcFileFormat::ScanFile(std::shared_ptr<ScanOptions> op
 //
 
 std::shared_ptr<FileWriteOptions> IpcFileFormat::DefaultWriteOptions() {
-  std::shared_ptr<IpcFileWriteOptions> options(
+  std::shared_ptr<IpcFileWriteOptions> ipc_options(
       new IpcFileWriteOptions(shared_from_this()));
 
-  options->ipc_options =
+  ipc_options->options =
       std::make_shared<ipc::IpcWriteOptions>(ipc::IpcWriteOptions::Defaults());
-  return options;
+  return ipc_options;
 }
 
 Result<std::shared_ptr<FileWriter>> IpcFileFormat::MakeWriter(
@@ -185,7 +185,13 @@ Result<std::shared_ptr<FileWriter>> 
IpcFileFormat::MakeWriter(
 
   auto ipc_options = checked_pointer_cast<IpcFileWriteOptions>(options);
 
-  ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(destination, schema));
+  // override use_threads to avoid nested parallelism
+  ipc_options->options->use_threads = false;
+
+  ARROW_ASSIGN_OR_RAISE(auto writer,
+                        ipc::MakeFileWriter(destination, schema, 
*ipc_options->options,
+                                            ipc_options->metadata));
+
   return std::shared_ptr<FileWriter>(
       new IpcFileWriter(std::move(writer), std::move(schema), 
std::move(ipc_options)));
 }
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index 50650bf..2cdd837 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -66,7 +66,11 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
 
 class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions {
  public:
-  std::shared_ptr<ipc::IpcWriteOptions> ipc_options;
+  /// Options passed to ipc::MakeFileWriter. use_threads is ignored
+  std::shared_ptr<ipc::IpcWriteOptions> options;
+
+  /// custom_metadata written to the file's footer
+  std::shared_ptr<const KeyValueMetadata> metadata;
 
  protected:
   using FileWriteOptions::FileWriteOptions;
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc 
b/cpp/src/arrow/dataset/file_ipc_test.cc
index 808bae5..25bbe55 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -28,11 +28,13 @@
 #include "arrow/dataset/partition.h"
 #include "arrow/dataset/test_util.h"
 #include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/util.h"
+#include "arrow/util/key_value_metadata.h"
 
 namespace arrow {
 namespace dataset {
@@ -166,6 +168,35 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) {
   AssertBufferEqual(*written, *source->buffer());
 }
 
+TEST_F(TestIpcFileFormat, WriteRecordBatchReaderCustomOptions) {
+  std::shared_ptr<RecordBatchReader> reader = GetRecordBatchReader();
+  auto source = GetFileSource(reader.get());
+  reader = GetRecordBatchReader();
+
+  opts_ = ScanOptions::Make(reader->schema());
+
+  EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+  auto ipc_options =
+      
checked_pointer_cast<IpcFileWriteOptions>(format_->DefaultWriteOptions());
+  if (util::Codec::IsAvailable(Compression::ZSTD)) {
+    EXPECT_OK_AND_ASSIGN(ipc_options->options->codec,
+                         util::Codec::Create(Compression::ZSTD));
+  }
+  ipc_options->metadata = key_value_metadata({{"hello", "world"}});
+  EXPECT_OK_AND_ASSIGN(auto writer,
+                       format_->MakeWriter(sink, reader->schema(), 
ipc_options));
+  ASSERT_OK(writer->Write(reader.get()));
+  ASSERT_OK(writer->Finish());
+
+  EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+  EXPECT_OK_AND_ASSIGN(auto ipc_reader, ipc::RecordBatchFileReader::Open(
+                                            
std::make_shared<io::BufferReader>(written)));
+
+  EXPECT_EQ(ipc_reader->metadata()->sorted_pairs(),
+            ipc_options->metadata->sorted_pairs());
+}
+
 class TestIpcFileSystemDataset : public testing::Test,
                                  public WriteFileSystemDatasetMixin {
  public:
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 017e545..0194160 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -114,7 +114,7 @@ Status ScannerBuilder::Project(std::vector<std::string> 
columns) {
 
 Status ScannerBuilder::Filter(std::shared_ptr<Expression> filter) {
   
RETURN_NOT_OK(schema()->CanReferenceFieldsByNames(FieldsInExpression(*filter)));
-  RETURN_NOT_OK(filter->Validate(*schema()).status());
+  RETURN_NOT_OK(filter->Validate(*schema()));
   scan_options_->filter = std::move(filter);
   return Status::OK();
 }
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index 5aa4601..5ce8885 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -795,8 +795,9 @@ Status WriteTable(const Table& table, io::OutputStream* dst,
     return WriteFeatherV1(table, dst);
   } else {
     IpcWriteOptions ipc_options = IpcWriteOptions::Defaults();
-    ipc_options.compression = properties.compression;
-    ipc_options.compression_level = properties.compression_level;
+    ARROW_ASSIGN_OR_RAISE(
+        ipc_options.codec,
+        util::Codec::Create(properties.compression, 
properties.compression_level));
 
     std::shared_ptr<RecordBatchWriter> writer;
     ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(), 
ipc_options));
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc 
b/cpp/src/arrow/ipc/metadata_internal.cc
index 9c967a5..a82aef3 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -903,15 +903,15 @@ static Status WriteBuffers(FBB& fbb, const 
std::vector<BufferMetadata>& buffers,
 
 static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
                                  BodyCompressionOffset* out) {
-  if (options.compression != Compression::UNCOMPRESSED) {
+  if (options.codec != nullptr) {
     flatbuf::CompressionType codec;
-    if (options.compression == Compression::LZ4_FRAME) {
+    if (options.codec->compression_type() == Compression::LZ4_FRAME) {
       codec = flatbuf::CompressionType::LZ4_FRAME;
-    } else if (options.compression == Compression::ZSTD) {
+    } else if (options.codec->compression_type() == Compression::ZSTD) {
       codec = flatbuf::CompressionType::ZSTD;
     } else {
       return Status::Invalid("Unsupported IPC compression codec: ",
-                             
util::Codec::GetCodecAsString(options.compression));
+                             options.codec->name());
     }
     *out = flatbuf::CreateBodyCompression(fbb, codec,
                                           
flatbuf::BodyCompressionMethod::BUFFER);
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index 6bbd7b8..bf535cd 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -59,8 +59,7 @@ struct ARROW_EXPORT IpcWriteOptions {
   /// \brief Compression codec to use for record batch body buffers
   ///
   /// May only be UNCOMPRESSED, LZ4_FRAME and ZSTD.
-  Compression::type compression = Compression::UNCOMPRESSED;
-  int compression_level = Compression::kUseDefaultCompressionLevel;
+  std::shared_ptr<util::Codec> codec;
 
   /// \brief Use global CPU thread pool to parallelize any computational tasks
   /// like compression
diff --git a/cpp/src/arrow/ipc/read_write_test.cc 
b/cpp/src/arrow/ipc/read_write_test.cc
index 5d11c27..85f6e39 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -619,7 +619,7 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
       continue;
     }
     IpcWriteOptions write_options = IpcWriteOptions::Defaults();
-    write_options.compression = codec;
+    ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec));
     CheckRoundtrip(*batch, write_options);
 
     // Check non-parallel read and write
@@ -636,9 +636,9 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
     if (!util::Codec::IsAvailable(codec)) {
       continue;
     }
-    IpcWriteOptions options = IpcWriteOptions::Defaults();
-    options.compression = codec;
-    ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, options));
+    IpcWriteOptions write_options = IpcWriteOptions::Defaults();
+    ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec));
+    ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, write_options));
   }
 }
 
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 9f9be32..adce911 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -185,17 +185,13 @@ class RecordBatchSerializer {
   }
 
   Status CompressBodyBuffers() {
-    std::unique_ptr<util::Codec> codec;
-
-    RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression));
-
-    ARROW_ASSIGN_OR_RAISE(
-        codec, util::Codec::Create(options_.compression, 
options_.compression_level));
+    RETURN_NOT_OK(
+        
internal::CheckCompressionSupported(options_.codec->compression_type()));
 
     auto CompressOne = [&](size_t i) {
       if (out_->body_buffers[i]->size() > 0) {
-        RETURN_NOT_OK(
-            CompressBuffer(*out_->body_buffers[i], codec.get(), 
&out_->body_buffers[i]));
+        RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], 
options_.codec.get(),
+                                     &out_->body_buffers[i]));
       }
       return Status::OK();
     };
@@ -216,7 +212,7 @@ class RecordBatchSerializer {
       RETURN_NOT_OK(VisitArray(*batch.column(i)));
     }
 
-    if (options_.compression != Compression::UNCOMPRESSED) {
+    if (options_.codec != nullptr) {
       RETURN_NOT_OK(CompressBodyBuffers());
     }
 
@@ -227,8 +223,7 @@ class RecordBatchSerializer {
     buffer_meta_.reserve(out_->body_buffers.size());
 
     // Construct the buffer metadata for the record batch header
-    for (size_t i = 0; i < out_->body_buffers.size(); ++i) {
-      const Buffer* buffer = out_->body_buffers[i].get();
+    for (const auto& buffer : out_->body_buffers) {
       int64_t size = 0;
       int64_t padding = 0;
 
diff --git a/cpp/src/arrow/testing/gtest_util.h 
b/cpp/src/arrow/testing/gtest_util.h
index feba8ba..fe90c37 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -464,3 +464,15 @@ class ARROW_TESTING_EXPORT EnvVarGuard {
 #endif
 
 }  // namespace arrow
+
+namespace nonstd {
+namespace sv_lite {
+
+// Without this hint, GTest will print string_views as a container of char
+template <class Char, class Traits = std::char_traits<Char>>
+void PrintTo(const basic_string_view<Char, Traits>& view, std::ostream* os) {
+  *os << view;
+}
+
+}  // namespace sv_lite
+}  // namespace nonstd
diff --git a/cpp/src/arrow/util/compression.cc 
b/cpp/src/arrow/util/compression.cc
index d05bd0f..f9c084f 100644
--- a/cpp/src/arrow/util/compression.cc
+++ b/cpp/src/arrow/util/compression.cc
@@ -24,160 +24,156 @@
 #include "arrow/result.h"
 #include "arrow/status.h"
 #include "arrow/util/compression_internal.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 namespace util {
 
-Compressor::~Compressor() {}
-
-Decompressor::~Decompressor() {}
-
-Codec::~Codec() {}
-
 int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; }
 
 Status Codec::Init() { return Status::OK(); }
 
-std::string Codec::GetCodecAsString(Compression::type t) {
+const std::string& Codec::GetCodecAsString(Compression::type t) {
+  static const std::string uncompressed = "uncompressed", snappy = "snappy",
+                           gzip = "gzip", lzo = "lzo", brotli = "brotli",
+                           lz4_raw = "lz4_raw", lz4 = "lz4", lz4_hadoop = 
"lz4_hadoop",
+                           zstd = "zstd", bz2 = "bz2", unknown = "unknown";
+
   switch (t) {
     case Compression::UNCOMPRESSED:
-      return "UNCOMPRESSED";
+      return uncompressed;
     case Compression::SNAPPY:
-      return "SNAPPY";
+      return snappy;
     case Compression::GZIP:
-      return "GZIP";
+      return gzip;
     case Compression::LZO:
-      return "LZO";
+      return lzo;
     case Compression::BROTLI:
-      return "BROTLI";
+      return brotli;
     case Compression::LZ4:
-      return "LZ4_RAW";
+      return lz4_raw;
     case Compression::LZ4_FRAME:
-      return "LZ4";
+      return lz4;
     case Compression::LZ4_HADOOP:
-      return "LZ4_HADOOP";
+      return lz4_hadoop;
     case Compression::ZSTD:
-      return "ZSTD";
+      return zstd;
     case Compression::BZ2:
-      return "BZ2";
+      return bz2;
     default:
-      return "UNKNOWN";
+      return unknown;
   }
 }
 
 Result<Compression::type> Codec::GetCompressionType(const std::string& name) {
-  if (name == "UNCOMPRESSED") {
+  if (name == "uncompressed") {
     return Compression::UNCOMPRESSED;
-  } else if (name == "GZIP") {
+  } else if (name == "gzip") {
     return Compression::GZIP;
-  } else if (name == "SNAPPY") {
+  } else if (name == "snappy") {
     return Compression::SNAPPY;
-  } else if (name == "LZO") {
+  } else if (name == "lzo") {
     return Compression::LZO;
-  } else if (name == "BROTLI") {
+  } else if (name == "brotli") {
     return Compression::BROTLI;
-  } else if (name == "LZ4_RAW") {
+  } else if (name == "lz4_raw") {
     return Compression::LZ4;
-  } else if (name == "LZ4") {
+  } else if (name == "lz4") {
     return Compression::LZ4_FRAME;
-  } else if (name == "LZ4_HADOOP") {
+  } else if (name == "lz4_hadoop") {
     return Compression::LZ4_HADOOP;
-  } else if (name == "ZSTD") {
+  } else if (name == "zstd") {
     return Compression::ZSTD;
-  } else if (name == "BZ2") {
+  } else if (name == "bz2") {
     return Compression::BZ2;
   } else {
     return Status::Invalid("Unrecognized compression type: ", name);
   }
 }
 
+bool Codec::SupportsCompressionLevel(Compression::type codec) {
+  switch (codec) {
+    case Compression::GZIP:
+    case Compression::BROTLI:
+    case Compression::ZSTD:
+    case Compression::BZ2:
+      return true;
+    default:
+      return false;
+  }
+}
+
 Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
                                              int compression_level) {
+  if (!IsAvailable(codec_type)) {
+    if (codec_type == Compression::LZO) {
+      return Status::NotImplemented("LZO codec not implemented");
+    }
+
+    auto name = GetCodecAsString(codec_type);
+    if (name == "unknown") {
+      return Status::Invalid("Unrecognized codec");
+    }
+
+    return Status::NotImplemented("Support for codec '", 
GetCodecAsString(codec_type),
+                                  "' not built");
+  }
+
+  if (compression_level != kUseDefaultCompressionLevel &&
+      !SupportsCompressionLevel(codec_type)) {
+    return Status::Invalid("Codec '", GetCodecAsString(codec_type),
+                           "' doesn't support setting a compression level.");
+  }
+
   std::unique_ptr<Codec> codec;
-  const bool compression_level_set{compression_level != 
kUseDefaultCompressionLevel};
   switch (codec_type) {
     case Compression::UNCOMPRESSED:
-      if (compression_level_set) {
-        return Status::Invalid("Compression level cannot be specified for 
UNCOMPRESSED.");
-      }
       return nullptr;
     case Compression::SNAPPY:
 #ifdef ARROW_WITH_SNAPPY
-      if (compression_level_set) {
-        return Status::Invalid("Snappy doesn't support setting a compression 
level.");
-      }
       codec = internal::MakeSnappyCodec();
-      break;
-#else
-      return Status::NotImplemented("Snappy codec support not built");
 #endif
+      break;
     case Compression::GZIP:
 #ifdef ARROW_WITH_ZLIB
       codec = internal::MakeGZipCodec(compression_level);
-      break;
-#else
-      return Status::NotImplemented("Gzip codec support not built");
 #endif
-    case Compression::LZO:
-      if (compression_level_set) {
-        return Status::Invalid("LZ0 doesn't support setting a compression 
level.");
-      }
-      return Status::NotImplemented("LZO codec not implemented");
+      break;
     case Compression::BROTLI:
 #ifdef ARROW_WITH_BROTLI
       codec = internal::MakeBrotliCodec(compression_level);
-      break;
-#else
-      return Status::NotImplemented("Brotli codec support not built");
 #endif
+      break;
     case Compression::LZ4:
 #ifdef ARROW_WITH_LZ4
-      if (compression_level_set) {
-        return Status::Invalid("LZ4 doesn't support setting a compression 
level.");
-      }
       codec = internal::MakeLz4RawCodec();
-      break;
-#else
-      return Status::NotImplemented("LZ4 codec support not built");
 #endif
+      break;
     case Compression::LZ4_FRAME:
 #ifdef ARROW_WITH_LZ4
-      if (compression_level_set) {
-        return Status::Invalid("LZ4 doesn't support setting a compression 
level.");
-      }
       codec = internal::MakeLz4FrameCodec();
-      break;
-#else
-      return Status::NotImplemented("LZ4 codec support not built");
 #endif
+      break;
     case Compression::LZ4_HADOOP:
 #ifdef ARROW_WITH_LZ4
-      if (compression_level_set) {
-        return Status::Invalid("LZ4 doesn't support setting a compression 
level.");
-      }
       codec = internal::MakeLz4HadoopRawCodec();
-      break;
-#else
-      return Status::NotImplemented("LZ4 codec support not built");
 #endif
+      break;
     case Compression::ZSTD:
 #ifdef ARROW_WITH_ZSTD
       codec = internal::MakeZSTDCodec(compression_level);
-      break;
-#else
-      return Status::NotImplemented("ZSTD codec support not built");
 #endif
+      break;
     case Compression::BZ2:
 #ifdef ARROW_WITH_BZ2
       codec = internal::MakeBZ2Codec(compression_level);
-      break;
-#else
-      return Status::NotImplemented("BZ2 codec support not built");
 #endif
+      break;
     default:
-      return Status::Invalid("Unrecognized codec");
+      break;
   }
 
+  DCHECK_NE(codec, nullptr);
   RETURN_NOT_OK(codec->Init());
   return std::move(codec);
 }
diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index 551ecbe..d3e8e1e 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -54,7 +54,7 @@ constexpr int kUseDefaultCompressionLevel = 
Compression::kUseDefaultCompressionL
 ///
 class ARROW_EXPORT Compressor {
  public:
-  virtual ~Compressor();
+  virtual ~Compressor() = default;
 
   struct CompressResult {
     int64_t bytes_read;
@@ -96,7 +96,7 @@ class ARROW_EXPORT Compressor {
 ///
 class ARROW_EXPORT Decompressor {
  public:
-  virtual ~Decompressor();
+  virtual ~Decompressor() = default;
 
   struct DecompressResult {
     // XXX is need_more_output necessary? (Brotli?)
@@ -128,14 +128,14 @@ class ARROW_EXPORT Decompressor {
 /// \brief Compression codec
 class ARROW_EXPORT Codec {
  public:
-  virtual ~Codec();
+  virtual ~Codec() = default;
 
   /// \brief Return special value to indicate that a codec implementation
   /// should use its default compression level
   static int UseDefaultCompressionLevel();
 
   /// \brief Return a string name for compression type
-  static std::string GetCodecAsString(Compression::type t);
+  static const std::string& GetCodecAsString(Compression::type t);
 
   /// \brief Return compression type for name (all upper case)
   static Result<Compression::type> GetCompressionType(const std::string& name);
@@ -147,6 +147,9 @@ class ARROW_EXPORT Codec {
   /// \brief Return true if support for indicated codec has been enabled
   static bool IsAvailable(Compression::type codec);
 
+  /// \brief Return true if indicated codec supports setting a compression 
level
+  static bool SupportsCompressionLevel(Compression::type codec);
+
   /// \brief One-shot decompression function
   ///
   /// output_buffer_len must be correct and therefore be obtained in advance.
@@ -178,7 +181,14 @@ class ARROW_EXPORT Codec {
   /// \brief Create a streaming compressor instance
   virtual Result<std::shared_ptr<Decompressor>> MakeDecompressor() = 0;
 
-  virtual const char* name() const = 0;
+  /// \brief This Codec's compression type
+  virtual Compression::type compression_type() const = 0;
+
+  /// \brief The name of this Codec's compression type
+  const std::string& name() const { return 
GetCodecAsString(compression_type()); }
+
+  /// \brief This Codec's compression level, if applicable
+  virtual int compression_level() const { return UseDefaultCompressionLevel(); 
}
 
  private:
   /// \brief Initializes the codec's resources.
diff --git a/cpp/src/arrow/util/compression_brotli.cc 
b/cpp/src/arrow/util/compression_brotli.cc
index ca4f523..4feabe2 100644
--- a/cpp/src/arrow/util/compression_brotli.cc
+++ b/cpp/src/arrow/util/compression_brotli.cc
@@ -38,8 +38,6 @@ namespace {
 
 class BrotliDecompressor : public Decompressor {
  public:
-  BrotliDecompressor() {}
-
   ~BrotliDecompressor() override {
     if (state_ != nullptr) {
       BrotliDecoderDestroyInstance(state_);
@@ -167,7 +165,7 @@ class BrotliCompressor : public Compressor {
   BrotliEncoderState* state_ = nullptr;
 
  private:
-  int compression_level_;
+  const int compression_level_;
 };
 
 // ----------------------------------------------------------------------
@@ -175,11 +173,10 @@ class BrotliCompressor : public Compressor {
 
 class BrotliCodec : public Codec {
  public:
-  explicit BrotliCodec(int compression_level) {
-    compression_level_ = compression_level == kUseDefaultCompressionLevel
-                             ? kBrotliDefaultCompressionLevel
-                             : compression_level;
-  }
+  explicit BrotliCodec(int compression_level)
+      : compression_level_(compression_level == kUseDefaultCompressionLevel
+                               ? kBrotliDefaultCompressionLevel
+                               : compression_level) {}
 
   Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
                              int64_t output_buffer_len, uint8_t* 
output_buffer) override {
@@ -224,10 +221,12 @@ class BrotliCodec : public Codec {
     return ptr;
   }
 
-  const char* name() const override { return "brotli"; }
+  Compression::type compression_type() const override { return 
Compression::BROTLI; }
+
+  int compression_level() const override { return compression_level_; }
 
  private:
-  int compression_level_;
+  const int compression_level_;
 };
 
 }  // namespace
diff --git a/cpp/src/arrow/util/compression_bz2.cc 
b/cpp/src/arrow/util/compression_bz2.cc
index dfb266a..8a8c1cb 100644
--- a/cpp/src/arrow/util/compression_bz2.cc
+++ b/cpp/src/arrow/util/compression_bz2.cc
@@ -262,7 +262,9 @@ class BZ2Codec : public Codec {
     return ptr;
   }
 
-  const char* name() const override { return "bz2"; }
+  Compression::type compression_type() const override { return 
Compression::BZ2; }
+
+  int compression_level() const override { return compression_level_; }
 
  private:
   int compression_level_;
diff --git a/cpp/src/arrow/util/compression_lz4.cc 
b/cpp/src/arrow/util/compression_lz4.cc
index 17a8514..365cd0f 100644
--- a/cpp/src/arrow/util/compression_lz4.cc
+++ b/cpp/src/arrow/util/compression_lz4.cc
@@ -298,10 +298,10 @@ class Lz4FrameCodec : public Codec {
     return ptr;
   }
 
-  const char* name() const override { return "lz4"; }
+  Compression::type compression_type() const override { return 
Compression::LZ4_FRAME; }
 
  protected:
-  LZ4F_preferences_t prefs_;
+  const LZ4F_preferences_t prefs_;
 };
 
 // ----------------------------------------------------------------------
@@ -348,7 +348,7 @@ class Lz4Codec : public Codec {
         "Try using LZ4 frame format instead.");
   }
 
-  const char* name() const override { return "lz4_raw"; }
+  Compression::type compression_type() const override { return 
Compression::LZ4; }
 };
 
 // ----------------------------------------------------------------------
@@ -407,7 +407,7 @@ class Lz4HadoopCodec : public Lz4Codec {
         "Try using LZ4 frame format instead.");
   }
 
-  const char* name() const override { return "lz4_hadoop_raw"; }
+  Compression::type compression_type() const override { return 
Compression::LZ4_HADOOP; }
 
  protected:
   // Offset starting at which page data can be read/written
diff --git a/cpp/src/arrow/util/compression_snappy.cc 
b/cpp/src/arrow/util/compression_snappy.cc
index 9cd06dc..9b01687 100644
--- a/cpp/src/arrow/util/compression_snappy.cc
+++ b/cpp/src/arrow/util/compression_snappy.cc
@@ -41,8 +41,6 @@ namespace {
 
 class SnappyCodec : public Codec {
  public:
-  SnappyCodec() {}
-
   Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
                              int64_t output_buffer_len, uint8_t* 
output_buffer) override {
     size_t decompressed_size;
@@ -87,7 +85,7 @@ class SnappyCodec : public Codec {
     return Status::NotImplemented("Streaming decompression unsupported with 
Snappy");
   }
 
-  const char* name() const override { return "snappy"; }
+  Compression::type compression_type() const override { return 
Compression::SNAPPY; }
 };
 
 }  // namespace
diff --git a/cpp/src/arrow/util/compression_test.cc 
b/cpp/src/arrow/util/compression_test.cc
index 1df5236..8b184f2 100644
--- a/cpp/src/arrow/util/compression_test.cc
+++ b/cpp/src/arrow/util/compression_test.cc
@@ -320,30 +320,30 @@ class CodecTest : public 
::testing::TestWithParam<Compression::type> {
 };
 
 TEST(TestCodecMisc, GetCodecAsString) {
-  ASSERT_EQ("UNCOMPRESSED", 
Codec::GetCodecAsString(Compression::UNCOMPRESSED));
-  ASSERT_EQ("SNAPPY", Codec::GetCodecAsString(Compression::SNAPPY));
-  ASSERT_EQ("GZIP", Codec::GetCodecAsString(Compression::GZIP));
-  ASSERT_EQ("LZO", Codec::GetCodecAsString(Compression::LZO));
-  ASSERT_EQ("BROTLI", Codec::GetCodecAsString(Compression::BROTLI));
-  ASSERT_EQ("LZ4_RAW", Codec::GetCodecAsString(Compression::LZ4));
-  ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4_FRAME));
-  ASSERT_EQ("ZSTD", Codec::GetCodecAsString(Compression::ZSTD));
-  ASSERT_EQ("BZ2", Codec::GetCodecAsString(Compression::BZ2));
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::UNCOMPRESSED), 
"uncompressed");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::SNAPPY), "snappy");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::GZIP), "gzip");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::LZO), "lzo");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::BROTLI), "brotli");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4), "lz4_raw");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4_FRAME), "lz4");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::ZSTD), "zstd");
+  EXPECT_EQ(Codec::GetCodecAsString(Compression::BZ2), "bz2");
 }
 
 TEST(TestCodecMisc, GetCompressionType) {
-  ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED, 
Codec::GetCompressionType("UNCOMPRESSED"));
-  ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("SNAPPY"));
-  ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("GZIP"));
-  ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("LZO"));
-  ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("BROTLI"));
-  ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("LZ4_RAW"));
-  ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("LZ4"));
-  ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("ZSTD"));
-  ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("BZ2"));
+  ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED, 
Codec::GetCompressionType("uncompressed"));
+  ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("snappy"));
+  ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("gzip"));
+  ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("lzo"));
+  ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("brotli"));
+  ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("lz4_raw"));
+  ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("lz4"));
+  ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("zstd"));
+  ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("bz2"));
 
   ASSERT_RAISES(Invalid, Codec::GetCompressionType("unk"));
-  ASSERT_RAISES(Invalid, Codec::GetCompressionType("snappy"));
+  ASSERT_RAISES(Invalid, Codec::GetCompressionType("SNAPPY"));
 }
 
 TEST_P(CodecTest, CodecRoundtrip) {
diff --git a/cpp/src/arrow/util/compression_zlib.cc 
b/cpp/src/arrow/util/compression_zlib.cc
index 717bbc6..84e517e 100644
--- a/cpp/src/arrow/util/compression_zlib.cc
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -463,7 +463,9 @@ class GZipCodec : public Codec {
     return InitDecompressor();
   }
 
-  const char* name() const override { return "gzip"; }
+  Compression::type compression_type() const override { return 
Compression::GZIP; }
+
+  int compression_level() const override { return compression_level_; }
 
  private:
   // zlib is stateful and the z_stream state variable must be initialized
diff --git a/cpp/src/arrow/util/compression_zstd.cc 
b/cpp/src/arrow/util/compression_zstd.cc
index e740960..382e057 100644
--- a/cpp/src/arrow/util/compression_zstd.cc
+++ b/cpp/src/arrow/util/compression_zstd.cc
@@ -173,20 +173,19 @@ class ZSTDCompressor : public Compressor {
 
 class ZSTDCodec : public Codec {
  public:
-  explicit ZSTDCodec(int compression_level) {
-    compression_level_ = compression_level == kUseDefaultCompressionLevel
-                             ? kZSTDDefaultCompressionLevel
-                             : compression_level;
-  }
+  explicit ZSTDCodec(int compression_level)
+      : compression_level_(compression_level == kUseDefaultCompressionLevel
+                               ? kZSTDDefaultCompressionLevel
+                               : compression_level) {}
 
   Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
                              int64_t output_buffer_len, uint8_t* 
output_buffer) override {
     if (output_buffer == nullptr) {
       // We may pass a NULL 0-byte output buffer but some zstd versions demand
       // a valid pointer: https://github.com/facebook/zstd/issues/1385
-      static uint8_t empty_buffer[1];
+      static uint8_t empty_buffer;
       DCHECK_EQ(output_buffer_len, 0);
-      output_buffer = empty_buffer;
+      output_buffer = &empty_buffer;
     }
 
     size_t ret = ZSTD_decompress(output_buffer, 
static_cast<size_t>(output_buffer_len),
@@ -228,10 +227,12 @@ class ZSTDCodec : public Codec {
     return ptr;
   }
 
-  const char* name() const override { return "zstd"; }
+  Compression::type compression_type() const override { return 
Compression::ZSTD; }
+
+  int compression_level() const override { return compression_level_; }
 
  private:
-  int compression_level_;
+  const int compression_level_;
 };
 
 }  // namespace
diff --git a/cpp/src/arrow/util/key_value_metadata.h 
b/cpp/src/arrow/util/key_value_metadata.h
index 2f64522..d4207a5 100644
--- a/cpp/src/arrow/util/key_value_metadata.h
+++ b/cpp/src/arrow/util/key_value_metadata.h
@@ -51,10 +51,13 @@ class ARROW_EXPORT KeyValueMetadata {
   Status Set(const std::string& key, const std::string& value);
 
   void reserve(int64_t n);
-  int64_t size() const;
 
+  int64_t size() const;
   const std::string& key(int64_t i) const;
   const std::string& value(int64_t i) const;
+  const std::vector<std::string>& keys() const { return keys_; }
+  const std::vector<std::string>& values() const { return values_; }
+
   std::vector<std::pair<std::string, std::string>> sorted_pairs() const;
 
   /// \brief Perform linear search for key, returning -1 if not found
diff --git a/cpp/src/arrow/util/string.cc b/cpp/src/arrow/util/string.cc
index 625086c..691f10e 100644
--- a/cpp/src/arrow/util/string.cc
+++ b/cpp/src/arrow/util/string.cc
@@ -144,6 +144,14 @@ std::string AsciiToLower(util::string_view value) {
   return result;
 }
 
+std::string AsciiToUpper(util::string_view value) {
+  // TODO: ASCII validation
+  std::string result = std::string(value);
+  std::transform(result.begin(), result.end(), result.begin(),
+                 [](unsigned char c) { return std::toupper(c); });
+  return result;
+}
+
 util::optional<std::string> Replace(util::string_view s, util::string_view 
token,
                                     util::string_view replacement) {
   size_t token_start = s.find(token);
diff --git a/cpp/src/arrow/util/string.h b/cpp/src/arrow/util/string.h
index 56a02df..feb75d4 100644
--- a/cpp/src/arrow/util/string.h
+++ b/cpp/src/arrow/util/string.h
@@ -57,6 +57,9 @@ bool AsciiEqualsCaseInsensitive(util::string_view left, 
util::string_view right)
 ARROW_EXPORT
 std::string AsciiToLower(util::string_view value);
 
+ARROW_EXPORT
+std::string AsciiToUpper(util::string_view value);
+
 /// \brief Search for the first instance of a token and replace it or return 
nullopt if
 /// the token is not found.
 ARROW_EXPORT
diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc
index a667a48..224a19d 100644
--- a/cpp/src/parquet/printer.cc
+++ b/cpp/src/parquet/printer.cc
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "arrow/util/key_value_metadata.h"
+#include "arrow/util/string.h"
 
 #include "parquet/column_scanner.h"
 #include "parquet/exception.h"
@@ -121,7 +122,9 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, 
std::list<int> selecte
         stream << "  Statistics Not Set";
       }
       stream << std::endl
-             << "  Compression: " << 
Codec::GetCodecAsString(column_chunk->compression())
+             << "  Compression: "
+             << arrow::internal::AsciiToUpper(
+                    Codec::GetCodecAsString(column_chunk->compression()))
              << ", Encodings:";
       for (auto encoding : column_chunk->encodings()) {
         stream << " " << EncodingToString(encoding);
@@ -256,7 +259,8 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, 
std::list<int> selected
         stream << "\"False\",";
       }
       stream << "\n           \"Compression\": \""
-             << Codec::GetCodecAsString(column_chunk->compression())
+             << arrow::internal::AsciiToUpper(
+                    Codec::GetCodecAsString(column_chunk->compression()))
              << "\", \"Encodings\": \"";
       for (auto encoding : column_chunk->encodings()) {
         stream << EncodingToString(encoding) << " ";
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index dee022f..afcfacc 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1337,7 +1337,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" 
nogil:
         c_bool write_legacy_ipc_format
         CMemoryPool* memory_pool
         CMetadataVersion metadata_version
-        CCompressionType compression
+        shared_ptr[CCodec] codec
         c_bool use_threads
 
         @staticmethod
@@ -2055,7 +2055,7 @@ cdef extern from 'arrow/util/compression.h' namespace 
'arrow' nogil:
         CResult[int64_t] Compress(int64_t input_len, const uint8_t* input,
                                   int64_t output_buffer_len,
                                   uint8_t* output_buffer)
-        const char* name() const
+        c_string name() const
         int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input)
 
 
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index a12915e..3fc0984 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -1557,25 +1557,6 @@ cdef CCompressionType _ensure_compression(str name) 
except *:
         raise ValueError('Invalid value for compression: {!r}'.format(name))
 
 
-cdef str _compression_name(CCompressionType ctype):
-    if ctype == CCompressionType_GZIP:
-        return 'gzip'
-    elif ctype == CCompressionType_BROTLI:
-        return 'brotli'
-    elif ctype == CCompressionType_BZ2:
-        return 'bz2'
-    elif ctype == CCompressionType_LZ4_FRAME:
-        return 'lz4'
-    elif ctype == CCompressionType_LZ4:
-        return 'lz4_raw'
-    elif ctype == CCompressionType_SNAPPY:
-        return 'snappy'
-    elif ctype == CCompressionType_ZSTD:
-        return 'zstd'
-    else:
-        raise RuntimeError('Unexpected CCompressionType value')
-
-
 cdef class Codec(_Weakrefable):
     """
     Compression codec.
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 74a81c6..5c8194d 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -94,17 +94,18 @@ cdef class IpcWriteOptions(_Weakrefable):
 
     @property
     def compression(self):
-        if self.c_options.compression == CCompressionType_UNCOMPRESSED:
+        if self.c_options.codec == nullptr:
             return None
         else:
-            return _compression_name(self.c_options.compression)
+            return frombytes(self.c_options.codec.get().name())
 
     @compression.setter
     def compression(self, value):
         if value is None:
-            self.c_options.compression = CCompressionType_UNCOMPRESSED
+            self.c_options.codec.reset()
         else:
-            self.c_options.compression = _ensure_compression(value)
+            self.c_options.codec = shared_ptr[CCodec](GetResultValue(
+                CCodec.Create(_ensure_compression(value))).release())
 
     @property
     def use_threads(self):
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 7c03732..cf83d69 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -432,6 +432,14 @@ dataset___ParquetFileWriteOptions__update <- 
function(options, writer_props, arr
     invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` , 
options, writer_props, arrow_writer_props))
 }
 
+dataset___IpcFileWriteOptions__update2 <- function(ipc_options, 
use_legacy_format, codec, metadata_version){
+    invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update2` , 
ipc_options, use_legacy_format, codec, metadata_version))
+}
+
+dataset___IpcFileWriteOptions__update1 <- function(ipc_options, 
use_legacy_format, metadata_version){
+    invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update1` , 
ipc_options, use_legacy_format, metadata_version))
+}
+
 dataset___IpcFileFormat__Make <- function(){
     .Call(`_arrow_dataset___IpcFileFormat__Make` )
 }
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index ec3ac36..8300e41 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -139,6 +139,18 @@ FileWriteOptions <- R6Class("FileWriteOptions", inherit = 
ArrowObject,
         dataset___ParquetFileWriteOptions__update(self,
             ParquetWriterProperties$create(...),
             ParquetArrowWriterProperties$create(...))
+      } else if (self$type == "ipc") {
+        args <- list(...)
+        if (is.null(args$codec)) {
+          dataset___IpcFileWriteOptions__update1(self,
+              get_ipc_use_legacy_format(args$use_legacy_format),
+              get_ipc_metadata_version(args$metadata_version))
+        } else {
+          dataset___IpcFileWriteOptions__update2(self,
+              get_ipc_use_legacy_format(args$use_legacy_format),
+              args$codec,
+              get_ipc_metadata_version(args$metadata_version))
+        }
       }
       invisible(self)
     }
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index b97a4e3..c5c9292 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -42,7 +42,16 @@
 #' @param hive_style logical: write partition segments as Hive-style
 #' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is 
`TRUE`.
 #' @param ... additional format-specific arguments. For available Parquet
-#' options, see [write_parquet()].
+#' options, see [write_parquet()]. The available Feather options are
+#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
+#'   versions 0.14 and lower can read it. Default is `FALSE`. You can also
+#'   enable this by setting the environment variable 
`ARROW_PRE_0_15_IPC_FORMAT=1`.
+#' - `metadata_version`: A string like "V5" or the equivalent integer 
indicating
+#'   the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
+#'   unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in
+#'   which case it will be V4.
+#' - `codec`: A [Codec] which will be used to compress body buffers of written
+#'   files. Default (NULL) will not compress body buffers.
 #' @return The input `dataset`, invisibly
 #' @export
 write_dataset <- function(dataset,
diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R
index 4c4d0bb..8b51603 100644
--- a/r/R/record-batch-writer.R
+++ b/r/R/record-batch-writer.R
@@ -39,7 +39,7 @@
 #' - `sink` An `OutputStream`
 #' - `schema` A [Schema] for the data to be written
 #' - `use_legacy_format` logical: write data formatted so that Arrow libraries
-#'   versions 0.14 and lower can read it? Default is `FALSE`. You can also
+#'   versions 0.14 and lower can read it. Default is `FALSE`. You can also
 #'   enable this by setting the environment variable 
`ARROW_PRE_0_15_IPC_FORMAT=1`.
 #' - `metadata_version`: A string like "V5" or the equivalent integer 
indicating
 #'   the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
@@ -130,7 +130,6 @@ RecordBatchStreamWriter$create <- function(sink,
       call. = FALSE
     )
   }
-  use_legacy_format <- use_legacy_format %||% 
identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1")
   assert_is(sink, "OutputStream")
   assert_is(schema, "Schema")
 
@@ -138,7 +137,7 @@ RecordBatchStreamWriter$create <- function(sink,
     ipc___RecordBatchStreamWriter__Open(
       sink,
       schema,
-      isTRUE(use_legacy_format),
+      get_ipc_use_legacy_format(use_legacy_format),
       get_ipc_metadata_version(metadata_version)
     )
   )
@@ -160,7 +159,6 @@ RecordBatchFileWriter$create <- function(sink,
       call. = FALSE
     )
   }
-  use_legacy_format <- use_legacy_format %||% 
identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1")
   assert_is(sink, "OutputStream")
   assert_is(schema, "Schema")
 
@@ -168,7 +166,7 @@ RecordBatchFileWriter$create <- function(sink,
     ipc___RecordBatchFileWriter__Open(
       sink,
       schema,
-      isTRUE(use_legacy_format),
+      get_ipc_use_legacy_format(use_legacy_format),
       get_ipc_metadata_version(metadata_version)
     )
   )
@@ -196,3 +194,7 @@ get_ipc_metadata_version <- function(x) {
   }
   out
 }
+
+get_ipc_use_legacy_format <- function(x) {
+  isTRUE(x %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1"))
+}
diff --git a/r/man/RecordBatchWriter.Rd b/r/man/RecordBatchWriter.Rd
index 0422da6..038653b 100644
--- a/r/man/RecordBatchWriter.Rd
+++ b/r/man/RecordBatchWriter.Rd
@@ -23,7 +23,7 @@ factory methods instantiate the object and take the following 
arguments:
 \item \code{sink} An \code{OutputStream}
 \item \code{schema} A \link{Schema} for the data to be written
 \item \code{use_legacy_format} logical: write data formatted so that Arrow 
libraries
-versions 0.14 and lower can read it? Default is \code{FALSE}. You can also
+versions 0.14 and lower can read it. Default is \code{FALSE}. You can also
 enable this by setting the environment variable 
\code{ARROW_PRE_0_15_IPC_FORMAT=1}.
 \item \code{metadata_version}: A string like "V5" or the equivalent integer 
indicating
 the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd
index cb3853a..755c4c9 100644
--- a/r/man/write_dataset.Rd
+++ b/r/man/write_dataset.Rd
@@ -42,7 +42,18 @@ will yield \verb{"part-0.feather", ...}.}
 (\code{key1=value1/key2=value2/file.ext}) or as just bare values. Default is 
\code{TRUE}.}
 
 \item{...}{additional format-specific arguments. For available Parquet
-options, see \code{\link[=write_parquet]{write_parquet()}}.}
+options, see \code{\link[=write_parquet]{write_parquet()}}. The available 
Feather options are
+\itemize{
+\item \code{use_legacy_format} logical: write data formatted so that Arrow 
libraries
+versions 0.14 and lower can read it. Default is \code{FALSE}. You can also
+enable this by setting the environment variable 
\code{ARROW_PRE_0_15_IPC_FORMAT=1}.
+\item \code{metadata_version}: A string like "V5" or the equivalent integer 
indicating
+the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
+unless the environment variable \code{ARROW_PRE_1_0_METADATA_VERSION=1}, in
+which case it will be V4.
+\item \code{codec}: A \link{Codec} which will be used to compress body buffers 
of written
+files. Default (NULL) will not compress body buffers.
+}}
 }
 \value{
 The input \code{dataset}, invisibly
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index d0273a5..cadfc8c 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1697,6 +1697,43 @@ extern "C" SEXP 
_arrow_dataset___ParquetFileWriteOptions__update(SEXP options_se
 
 // dataset.cpp
 #if defined(ARROW_R_WITH_ARROW)
+void dataset___IpcFileWriteOptions__update2(const 
std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool use_legacy_format, 
const std::shared_ptr<arrow::util::Codec>& codec, arrow::ipc::MetadataVersion 
metadata_version);
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP 
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP 
metadata_version_sexp){
+BEGIN_CPP11
+       arrow::r::Input<const std::shared_ptr<ds::IpcFileWriteOptions>&>::type 
ipc_options(ipc_options_sexp);
+       arrow::r::Input<bool>::type use_legacy_format(use_legacy_format_sexp);
+       arrow::r::Input<const std::shared_ptr<arrow::util::Codec>&>::type 
codec(codec_sexp);
+       arrow::r::Input<arrow::ipc::MetadataVersion>::type 
metadata_version(metadata_version_sexp);
+       dataset___IpcFileWriteOptions__update2(ipc_options, use_legacy_format, 
codec, metadata_version);
+       return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP 
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP 
metadata_version_sexp){
+       Rf_error("Cannot call dataset___IpcFileWriteOptions__update2(). Please 
use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_ARROW)
+void dataset___IpcFileWriteOptions__update1(const 
std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool use_legacy_format, 
arrow::ipc::MetadataVersion metadata_version);
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP 
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){
+BEGIN_CPP11
+       arrow::r::Input<const std::shared_ptr<ds::IpcFileWriteOptions>&>::type 
ipc_options(ipc_options_sexp);
+       arrow::r::Input<bool>::type use_legacy_format(use_legacy_format_sexp);
+       arrow::r::Input<arrow::ipc::MetadataVersion>::type 
metadata_version(metadata_version_sexp);
+       dataset___IpcFileWriteOptions__update1(ipc_options, use_legacy_format, 
metadata_version);
+       return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP 
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){
+       Rf_error("Cannot call dataset___IpcFileWriteOptions__update1(). Please 
use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_ARROW)
 std::shared_ptr<ds::IpcFileFormat> dataset___IpcFileFormat__Make();
 extern "C" SEXP _arrow_dataset___IpcFileFormat__Make(){
 BEGIN_CPP11
@@ -6399,6 +6436,8 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) 
&_arrow_dataset___ParquetFileFormat__Make, 3}, 
                { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) 
&_arrow_dataset___FileWriteOptions__type_name, 1}, 
                { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) 
&_arrow_dataset___ParquetFileWriteOptions__update, 3}, 
+               { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) 
&_arrow_dataset___IpcFileWriteOptions__update2, 4}, 
+               { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC) 
&_arrow_dataset___IpcFileWriteOptions__update1, 3}, 
                { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) 
&_arrow_dataset___IpcFileFormat__Make, 0}, 
                { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) 
&_arrow_dataset___CsvFileFormat__Make, 1}, 
                { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) 
&_arrow_dataset___DirectoryPartitioning, 1}, 
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index f4c3ed1..8a88ab0 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -21,6 +21,7 @@
 
 #include <arrow/dataset/api.h>
 #include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/writer.h>
 #include <arrow/table.h>
 #include <arrow/util/iterator.h>
 
@@ -208,6 +209,24 @@ void dataset___ParquetFileWriteOptions__update(
 }
 
 // [[arrow::export]]
+void dataset___IpcFileWriteOptions__update2(
+    const std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool 
use_legacy_format,
+    const std::shared_ptr<arrow::util::Codec>& codec,
+    arrow::ipc::MetadataVersion metadata_version) {
+  ipc_options->options->write_legacy_ipc_format = use_legacy_format;
+  ipc_options->options->codec = codec;
+  ipc_options->options->metadata_version = metadata_version;
+}
+
+// [[arrow::export]]
+void dataset___IpcFileWriteOptions__update1(
+    const std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool 
use_legacy_format,
+    arrow::ipc::MetadataVersion metadata_version) {
+  ipc_options->options->write_legacy_ipc_format = use_legacy_format;
+  ipc_options->options->metadata_version = metadata_version;
+}
+
+// [[arrow::export]]
 std::shared_ptr<ds::IpcFileFormat> dataset___IpcFileFormat__Make() {
   return std::make_shared<ds::IpcFileFormat>();
 }
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 074fcd3..d1904fc 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -177,7 +177,7 @@ test_that("dataset from URI", {
 test_that("Simple interface for datasets (custom ParquetFileFormat)", {
   ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()),
                      format = FileFormat$create("parquet", dict_columns = 
c("chr")))
-  expect_equivalent(ds$schema$GetFieldByName("chr")$type, dictionary())
+  expect_type_equal(ds$schema$GetFieldByName("chr")$type, dictionary())
 })
 
 test_that("Hive partitioning", {
@@ -943,10 +943,38 @@ test_that("Dataset writing: from RecordBatch", {
   )
 })
 
+test_that("Writing a dataset: Ipc format options & compression", {
+  skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
+  ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
+  dst_dir <- make_temp_dir()
+
+  codec <- NULL
+  if (codec_is_available("zstd")) {
+    codec <- Codec$create("zstd")
+  }
+
+  write_dataset(ds, dst_dir, format = "feather", codec = codec)
+  expect_true(dir.exists(dst_dir))
+
+  new_ds <- open_dataset(dst_dir, format = "feather")
+  expect_equivalent(
+    new_ds %>%
+      select(string = chr, integer = int) %>%
+      filter(integer > 6 & integer < 11) %>%
+      collect() %>%
+      summarize(mean = mean(integer)),
+    df1 %>%
+      select(string = chr, integer = int) %>%
+      filter(integer > 6) %>%
+      summarize(mean = mean(integer))
+  )
+})
+
 test_that("Writing a dataset: Parquet format options", {
   skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
   ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
   dst_dir <- make_temp_dir()
+  dst_dir_no_truncated_timestamps <- make_temp_dir()
 
   # Use trace() to confirm that options are passed in
   trace(
@@ -956,7 +984,7 @@ test_that("Writing a dataset: Parquet format options", {
     where = write_dataset
   )
   expect_warning(
-    write_dataset(ds, make_temp_dir(), format = "parquet", partitioning = 
"int"),
+    write_dataset(ds, dst_dir_no_truncated_timestamps, format = "parquet", 
partitioning = "int"),
     "allow_truncated_timestamps == FALSE"
   )
   expect_warning(
diff --git a/r/tests/testthat/test-record-batch-reader.R 
b/r/tests/testthat/test-record-batch-reader.R
index e03664e..d9c3406 100644
--- a/r/tests/testthat/test-record-batch-reader.R
+++ b/r/tests/testthat/test-record-batch-reader.R
@@ -82,7 +82,7 @@ test_that("MetadataFormat", {
   Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = 1)
   expect_identical(get_ipc_metadata_version(NULL), 3L)
   Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = "")
-  
+
   expect_identical(get_ipc_metadata_version(NULL), 4L)
   Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = 1)
   expect_identical(get_ipc_metadata_version(NULL), 3L)

Reply via email to