This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9a8bd30 ARROW-10248: [Python][Dataset] Always apply Python's default
write properties
9a8bd30 is described below
commit 9a8bd30e191cec6eaa3c0d3ee2d781d52d508cb8
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Sat Oct 10 10:29:21 2020 +0200
ARROW-10248: [Python][Dataset] Always apply Python's default write
properties
Ensuring that `_create_writer_properties` and
`_create_arrow_writer_properties` are called on construction of
`ParquetFileWriteProperties` ensures that the python defaults like
`store_schema=True` are applied.
Closes #8415 from bkietz/10248-Dataset-writing-does-not-
Lead-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
python/pyarrow/_dataset.pyx | 68 ++++++++++++++++++++----------------
python/pyarrow/tests/test_dataset.py | 40 +++++++++++++++++++++
2 files changed, 77 insertions(+), 31 deletions(-)
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index a5ad205..ca4192c 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -1114,50 +1114,54 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
object _properties
def update(self, **kwargs):
- cdef CParquetFileWriteOptions* opts = self.parquet_options
-
arrow_fields = {
"use_deprecated_int96_timestamps",
"coerce_timestamps",
"allow_truncated_timestamps",
}
- update = False
- update_arrow = False
+ setters = set()
for name, value in kwargs.items():
if name not in self._properties:
raise TypeError("unexpected parquet write option: " + name)
self._properties[name] = value
if name in arrow_fields:
- update_arrow = True
+ setters.add(self._set_arrow_properties)
else:
- update = True
-
- if update:
- opts.writer_properties = _create_writer_properties(
- use_dictionary=self._properties["use_dictionary"],
- compression=self._properties["compression"],
- version=self._properties["version"],
- write_statistics=self._properties["write_statistics"],
- data_page_size=self._properties["data_page_size"],
- compression_level=self._properties["compression_level"],
- use_byte_stream_split=(
- self._properties["use_byte_stream_split"]
- ),
- data_page_version=self._properties["data_page_version"],
- )
+ setters.add(self._set_properties)
- if update_arrow:
- opts.arrow_writer_properties = _create_arrow_writer_properties(
- use_deprecated_int96_timestamps=(
- self._properties["use_deprecated_int96_timestamps"]
- ),
- coerce_timestamps=self._properties["coerce_timestamps"],
- allow_truncated_timestamps=(
- self._properties["allow_truncated_timestamps"]
- ),
- writer_engine_version='V2'
- )
+ for setter in setters:
+ setter()
+
+ def _set_properties(self):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ opts.writer_properties = _create_writer_properties(
+ use_dictionary=self._properties["use_dictionary"],
+ compression=self._properties["compression"],
+ version=self._properties["version"],
+ write_statistics=self._properties["write_statistics"],
+ data_page_size=self._properties["data_page_size"],
+ compression_level=self._properties["compression_level"],
+ use_byte_stream_split=(
+ self._properties["use_byte_stream_split"]
+ ),
+ data_page_version=self._properties["data_page_version"],
+ )
+
+ def _set_arrow_properties(self):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ opts.arrow_writer_properties = _create_arrow_writer_properties(
+ use_deprecated_int96_timestamps=(
+ self._properties["use_deprecated_int96_timestamps"]
+ ),
+ coerce_timestamps=self._properties["coerce_timestamps"],
+ allow_truncated_timestamps=(
+ self._properties["allow_truncated_timestamps"]
+ ),
+ writer_engine_version="V2",
+ )
cdef void init(self, const shared_ptr[CFileWriteOptions]& sp):
FileWriteOptions.init(self, sp)
@@ -1175,6 +1179,8 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
coerce_timestamps=None,
allow_truncated_timestamps=False,
)
+ self._set_properties()
+ self._set_arrow_properties()
cdef class ParquetFileFormat(FileFormat):
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index a7a9f97..6786fb9 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -2385,3 +2385,43 @@ def test_write_dataset_parquet(tempdir):
ds.write_dataset(table, base_dir, format=format, file_options=opts)
meta = pq.read_metadata(base_dir / "part-0.parquet")
assert meta.format_version == version
+
+
[email protected]
[email protected]
+def test_write_dataset_arrow_schema_metadata(tempdir):
+ # ensure we serialize ARROW schema in the parquet metadata, to have a
+ # correct roundtrip (e.g. preserve non-UTC timezone)
+ import pyarrow.parquet as pq
+
+ table = pa.table({"a": [pd.Timestamp("2012-01-01", tz="Europe/Brussels")]})
+ assert table["a"].type.tz == "Europe/Brussels"
+
+ ds.write_dataset(table, tempdir, format="parquet")
+ result = pq.read_table(tempdir / "part-0.parquet")
+ assert result["a"].type.tz == "Europe/Brussels"
+
+
+def test_write_dataset_schema_metadata(tempdir):
+ # ensure that schema metadata gets written
+ from pyarrow import feather
+
+ table = pa.table({'a': [1, 2, 3]})
+ table = table.replace_schema_metadata({b'key': b'value'})
+ ds.write_dataset(table, tempdir, format="feather")
+
+ schema = feather.read_table(tempdir / "part-0.feather").schema
+ assert schema.metadata == {b'key': b'value'}
+
+
[email protected]
+def test_write_dataset_schema_metadata_parquet(tempdir):
+ # ensure that schema metadata gets written
+ import pyarrow.parquet as pq
+
+ table = pa.table({'a': [1, 2, 3]})
+ table = table.replace_schema_metadata({b'key': b'value'})
+ ds.write_dataset(table, tempdir, format="parquet")
+
+ schema = pq.read_table(tempdir / "part-0.parquet").schema
+ assert schema.metadata == {b'key': b'value'}