This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a8bd30  ARROW-10248: [Python][Dataset] Always apply Python's default 
write properties
9a8bd30 is described below

commit 9a8bd30e191cec6eaa3c0d3ee2d781d52d508cb8
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Sat Oct 10 10:29:21 2020 +0200

    ARROW-10248: [Python][Dataset] Always apply Python's default write 
properties
    
    Ensuring that `_create_writer_properties` and 
`_create_arrow_writer_properties` are called on construction of 
`ParquetFileWriteProperties` ensures that the python defaults like 
`store_schema=True` are applied.
    
    Closes #8415 from bkietz/10248-Dataset-writing-does-not-
    
    Lead-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 python/pyarrow/_dataset.pyx          | 68 ++++++++++++++++++++----------------
 python/pyarrow/tests/test_dataset.py | 40 +++++++++++++++++++++
 2 files changed, 77 insertions(+), 31 deletions(-)

diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index a5ad205..ca4192c 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -1114,50 +1114,54 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
         object _properties
 
     def update(self, **kwargs):
-        cdef CParquetFileWriteOptions* opts = self.parquet_options
-
         arrow_fields = {
             "use_deprecated_int96_timestamps",
             "coerce_timestamps",
             "allow_truncated_timestamps",
         }
 
-        update = False
-        update_arrow = False
+        setters = set()
         for name, value in kwargs.items():
             if name not in self._properties:
                 raise TypeError("unexpected parquet write option: " + name)
             self._properties[name] = value
             if name in arrow_fields:
-                update_arrow = True
+                setters.add(self._set_arrow_properties)
             else:
-                update = True
-
-        if update:
-            opts.writer_properties = _create_writer_properties(
-                use_dictionary=self._properties["use_dictionary"],
-                compression=self._properties["compression"],
-                version=self._properties["version"],
-                write_statistics=self._properties["write_statistics"],
-                data_page_size=self._properties["data_page_size"],
-                compression_level=self._properties["compression_level"],
-                use_byte_stream_split=(
-                    self._properties["use_byte_stream_split"]
-                ),
-                data_page_version=self._properties["data_page_version"],
-            )
+                setters.add(self._set_properties)
 
-        if update_arrow:
-            opts.arrow_writer_properties = _create_arrow_writer_properties(
-                use_deprecated_int96_timestamps=(
-                    self._properties["use_deprecated_int96_timestamps"]
-                ),
-                coerce_timestamps=self._properties["coerce_timestamps"],
-                allow_truncated_timestamps=(
-                    self._properties["allow_truncated_timestamps"]
-                ),
-                writer_engine_version='V2'
-            )
+        for setter in setters:
+            setter()
+
+    def _set_properties(self):
+        cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+        opts.writer_properties = _create_writer_properties(
+            use_dictionary=self._properties["use_dictionary"],
+            compression=self._properties["compression"],
+            version=self._properties["version"],
+            write_statistics=self._properties["write_statistics"],
+            data_page_size=self._properties["data_page_size"],
+            compression_level=self._properties["compression_level"],
+            use_byte_stream_split=(
+                self._properties["use_byte_stream_split"]
+            ),
+            data_page_version=self._properties["data_page_version"],
+        )
+
+    def _set_arrow_properties(self):
+        cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+        opts.arrow_writer_properties = _create_arrow_writer_properties(
+            use_deprecated_int96_timestamps=(
+                self._properties["use_deprecated_int96_timestamps"]
+            ),
+            coerce_timestamps=self._properties["coerce_timestamps"],
+            allow_truncated_timestamps=(
+                self._properties["allow_truncated_timestamps"]
+            ),
+            writer_engine_version="V2",
+        )
 
     cdef void init(self, const shared_ptr[CFileWriteOptions]& sp):
         FileWriteOptions.init(self, sp)
@@ -1175,6 +1179,8 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
             coerce_timestamps=None,
             allow_truncated_timestamps=False,
         )
+        self._set_properties()
+        self._set_arrow_properties()
 
 
 cdef class ParquetFileFormat(FileFormat):
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index a7a9f97..6786fb9 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -2385,3 +2385,43 @@ def test_write_dataset_parquet(tempdir):
         ds.write_dataset(table, base_dir, format=format, file_options=opts)
         meta = pq.read_metadata(base_dir / "part-0.parquet")
         assert meta.format_version == version
+
+
[email protected]
[email protected]
+def test_write_dataset_arrow_schema_metadata(tempdir):
+    # ensure we serialize ARROW schema in the parquet metadata, to have a
+    # correct roundtrip (e.g. preserve non-UTC timezone)
+    import pyarrow.parquet as pq
+
+    table = pa.table({"a": [pd.Timestamp("2012-01-01", tz="Europe/Brussels")]})
+    assert table["a"].type.tz == "Europe/Brussels"
+
+    ds.write_dataset(table, tempdir, format="parquet")
+    result = pq.read_table(tempdir / "part-0.parquet")
+    assert result["a"].type.tz == "Europe/Brussels"
+
+
+def test_write_dataset_schema_metadata(tempdir):
+    # ensure that schema metadata gets written
+    from pyarrow import feather
+
+    table = pa.table({'a': [1, 2, 3]})
+    table = table.replace_schema_metadata({b'key': b'value'})
+    ds.write_dataset(table, tempdir, format="feather")
+
+    schema = feather.read_table(tempdir / "part-0.feather").schema
+    assert schema.metadata == {b'key': b'value'}
+
+
[email protected]
+def test_write_dataset_schema_metadata_parquet(tempdir):
+    # ensure that schema metadata gets written
+    import pyarrow.parquet as pq
+
+    table = pa.table({'a': [1, 2, 3]})
+    table = table.replace_schema_metadata({b'key': b'value'})
+    ds.write_dataset(table, tempdir, format="parquet")
+
+    schema = pq.read_table(tempdir / "part-0.parquet").schema
+    assert schema.metadata == {b'key': b'value'}

Reply via email to