This can also be solved by using a table format like https://github.com/uber/hudi or https://github.com/apache/incubator-iceberg where the latter has a PR open for a basic Python implementation with pyarrow.
These table formats support using Avro and Parquet seamlessly together without the reader needing to take care of the storage format. Uwe > Am 19.12.2018 um 14:47 schrieb Wes McKinney <wesmck...@gmail.com>: > > This turns out to be a very common problem (landing incremental > updates, dealing with compaction and small files). It's part of the > reason that systems like Apache Kudu were developed, e.g. > > https://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/ > > If you have to use file storage, then figuring out a scheme to compact > Parquet files (e.g. once per hour, once per day) will definitely be > worth it compared with using a slower file format (like Avro) > > - Wes > >> On Wed, Dec 19, 2018 at 7:37 AM Joel Pfaff <joel.pf...@gmail.com> wrote: >> >> Hello, >> >> For my company's usecases, we have found that the number of files was a >> critical part of the time spent doing the execution plan, so we found the >> idea of very regularly writing small parquet files to be rather inefficient. >> >> There are some formats that support an `append` semantic (I have tested >> successfully with avro, but there are a couple others that could be used >> similarly). >> So we had a few cases where we were aggregating data in a `current table` >> in set of avro files, and rewriting all of it in few parquet files at the >> end of the day. >> This allowed us to have files that have been prepared to optimize their >> querying performance (file size, row group size, sorting per column) by >> maximizing the ability to benefit from the statistics. >> And our queries were doing an UNION between "optimized for speed" history >> tables and "optimized for latency" current tables, when the query timeframe >> was crossing the boundaries of the current day. >> >> Regards, Joel >> >> On Wed, Dec 19, 2018 at 2:14 PM Francois Saint-Jacques < >> fsaintjacq...@networkdump.com> wrote: >> >>> Hello Darren, >>> >>> what Uwe suggests is usually the way to go, your active process writes to a >>> new file every time. Then you have a parallel process/thread that does >>> compaction of smaller files in the background such that you don't have too >>> many files. >>> >>>> On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uw...@xhochy.com> wrote: >>>> >>>> Hello Darren, >>>> >>>> you're out of luck here. Parquet files are immutable and meant for batch >>>> writes. Once they're written you cannot modify them anymore. To load >>> them, >>>> you need to know their metadata which is in the footer. The footer is >>>> always at the end of the file and written once you call close. >>>> >>>> Your use case is normally fulfilled by continously starting new files and >>>> reading them back in using the ParquetDataset class >>>> >>>> Cheers >>>> Uwe >>>> >>>> Am 18.12.2018 um 21:03 schrieb Darren Gallagher <daz...@gmail.com>: >>>> >>>>>> [Cross posted from https://github.com/apache/arrow/issues/3203] >>>>>> >>>>>> I'm adding new data to a parquet file every 60 seconds using this >>> code: >>>>>> >>>>>> import os >>>>>> import json >>>>>> import time >>>>>> import requests >>>>>> import pandas as pd >>>>>> import numpy as np >>>>>> import pyarrow as pa >>>>>> import pyarrow.parquet as pq >>>>>> >>>>>> api_url = 'https://opensky-network.org/api/states/all' >>>>>> >>>>>> cols = ['icao24', 'callsign', 'origin', 'time_position', >>>>>> 'last_contact', 'longitude', 'latitude', >>>>>> 'baro_altitude', 'on_ground', 'velocity', 'true_track', >>>>>> 'vertical_rate', 'sensors', 'geo_altitude', 'squawk', >>>>>> 'spi', 'position_source'] >>>>>> >>>>>> def get_new_flight_info(writer): >>>>>> print("Requesting new data") >>>>>> req = requests.get(api_url) >>>>>> content = req.json() >>>>>> >>>>>> states = content['states'] >>>>>> df = pd.DataFrame(states, columns = cols) >>>>>> df['timestamp'] = content['time'] >>>>>> print("Found {} new items".format(len(df))) >>>>>> >>>>>> table = pa.Table.from_pandas(df) >>>>>> if writer is None: >>>>>> writer = pq.ParquetWriter('openskyflights.parquet', >>> table.schema) >>>>>> writer.write_table(table=table) >>>>>> return writer >>>>>> >>>>>> if __name__ == '__main__': >>>>>> writer = None >>>>>> while (not os.path.exists('opensky.STOP')): >>>>>> writer = get_new_flight_info(writer) >>>>>> time.sleep(60) >>>>>> >>>>>> if writer: >>>>>> writer.close() >>>>>> >>>>>> This is working fine and the file grows every 60 seconds. >>>>>> However unless I force the loop to exit I am unable to use the parquet >>>>>> file. In a separate terminal I try to access the parquet file using >>> this >>>>>> code: >>>>>> >>>>>> import pandas as pd >>>>>> import pyarrow.parquet as pq >>>>>> >>>>>> table = pq.read_table("openskyflights.parquet") >>>>>> df = table.to_pandas() >>>>>> print(len(df)) >>>>>> >>>>>> which results in this error: >>>>>> >>>>>> Traceback (most recent call last): >>>>>> File "checkdownloadsize.py", line 7, in <module> >>>>>> table = pq.read_table("openskyflights.parquet") >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", >>>> line 1074, in read_table >>>>>> use_pandas_metadata=use_pandas_metadata) >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py", >>>> line 182, in read_parquet >>>>>> filesystem=self) >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", >>>> line 882, in __init__ >>>>>> self.validate_schemas() >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", >>>> line 895, in validate_schemas >>>>>> self.schema = self.pieces[0].get_metadata(open_file).schema >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", >>>> line 453, in get_metadata >>>>>> return self._open(open_file_func).metadata >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", >>>> line 459, in _open >>>>>> reader = open_file_func(self.path) >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", >>>> line 984, in open_file >>>>>> common_metadata=self.common_metadata) >>>>>> File >>>> >>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", >>>> line 102, in __init__ >>>>>> self.reader.open(source, metadata=metadata) >>>>>> File "pyarrow/_parquet.pyx", line 639, in >>>> pyarrow._parquet.ParquetReader.open >>>>>> File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status >>>>>> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer. >>>>>> >>>>>> Is there a way to achieve this? >>>>>> I'm assuming that if I call writer.close() in the while loop then it >>>> will >>>>>> prevent any further data being written to the file? Is there some kind >>>> of >>>>>> "flush" operation that can be used to ensure all data is written to >>> disk >>>>>> and available to other processes or threads that want to read the >>> data? >>>>>> >>>>>> Thanks >>>>>> >>>> >>>> >>> >>> -- >>> Sent from my jetpack. >>>