I could be wrong, but I think zstd naively (or by default) requires the whole stream to be decompressed before you can access any data within it (it is not "splittable" and does not support random access). There are ways to provide this capability by essentially compressing in segments. The best concise description of this that I've found is at [1].
Either way, I also think that accessing a compressed stream using purely data parameters (e.g. tensor_offset and tensor_size) doesn't accommodate any structural information of the stream (e.g. schema, metadata, and various data boundaries). So, I think aside from the choice of compression, the approach you mentioned may be too naive anyways. I don't know what a good way to do this is, but you'd either have to dig into how the arrow library writes data to the compressed stream (I think around [2]) to get some inspiration, or you can try looking at possible solutions involving other compression algorithms (bzip2 should be splittable, and snappy may or may not be?). Something you can also try doing is writing in batches rather than the whole table, so that if nothing else you can "seek" to where a batch was written [3]. If you want to do your own book-keeping, you can potentially note positions in the stream when writing to it [4] and you can try and read from that position later. Doing any of these things, or anything at this level, might require you to look into the c++ code being called from python to figure out (in which case [2] will be useful), but I'm not sure how far down that rabbit hole you want to go. [1]: https://github.com/circulosmeos/gztool?tab=readme-ov-file#background [2]: https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/compressed.cc#L42 [3]: https://arrow.apache.org/docs/python/generated/pyarrow.ipc.RecordBatchStreamWriter.html#pyarrow.ipc.RecordBatchStreamWriter.write_batch [4]: https://arrow.apache.org/docs/python/generated/pyarrow.CompressedOutputStream.html#pyarrow.CompressedOutputStream.tell # ------------------------------ # Aldrin https://github.com/drin/ https://gitlab.com/octalene https://keybase.io/octalene On Wednesday, October 9th, 2024 at 15:12, Robert McLeod <robbmcl...@gmail.com> wrote: > I am trying to write multiple tables/tensors into a single stream/file. > Writing is straightforward, and I can read everything back out, but every > effort I have tried to pick an individual element out of a compressed stream > has failed. E.g. I would like to only extract Tensor #1 from the stream. I > provide some sample code: > > import pyarrow as pa > import pyarrow.ipc as ipc > import numpy as np > > data = { > 'column1': [1, 2, 3], > 'column2': ['a', 'b', 'c'] > } > data2 = { > 'column1': [7, 8, 9], > 'column2': ['d', 'e', 'f'] > } > > table = pa.table(data) > table2 = pa.table(data2) > > tensor = pa.Tensor.from_numpy(np.ndarray([5,6,7])) > tensor2 = pa.Tensor.from_numpy(np.ndarray([4,7,4,6,7,8])) > > sink = pa.BufferOutputStream() > > df_offset = sink.tell() > # We are not using the context manager here because CompressedOutputStream > closes > # the the sink stream when it exits. > compressed_sink = pa.CompressedOutputStream(sink, 'zstd') > with ipc.RecordBatchStreamWriter(compressed_sink, table.schema) as > table_writer: > table_writer.write_table(table) > compressed_sink.flush() > df_size = sink.tell() - df_offset > > df2_offset = sink.tell() > with ipc.RecordBatchStreamWriter(compressed_sink, table2.schema) as > table_writer: > table_writer.write_table(table2) > compressed_sink.flush() > df2_size = sink.tell() - df2_offset > > # Write our tensors > tensor_offset = sink.tell() > ipc.write_tensor(tensor, compressed_sink) > compressed_sink.flush() > tensor_size = sink.tell() - tensor_offset > > tensor_offset2 = sink.tell() > ipc.write_tensor(tensor2, compressed_sink) > compressed_sink.flush() > tensor_size2 = sink.tell() - tensor_offset2 > > # Convert to bytes to finalize the sink buffer > chunk = sink.getvalue() > > source = pa.BufferReader(chunk) > # We can read out every element in the stream without any problems > decompressed_source = pa.CompressedInputStream(source, 'zstd') > with ipc.RecordBatchStreamReader(decompressed_source) as table_reader: > read_table = table_reader.read_all() > with ipc.RecordBatchStreamReader(decompressed_source) as table_reader2: > read_table2 = table_reader2.read_all() > > read_tensor = ipc.read_tensor(decompressed_source) > read_tensor2 = ipc.read_tensor(decompressed_source) > > # But, how can we go about reading only one object in the source buffer? > block = chunk[tensor_offset:tensor_offset + tensor_size] > decompressed2 = pa.CompressedInputStream(block, 'zstd') > random_tensor = ipc.read_tensor(decompressed2) > # Traceback (most recent call last): > # File "<stdin>", line 1, in <module> > # File "pyarrow\ipc.pxi", line 1257, in pyarrow.lib.read_tensor > # File "pyarrow\error.pxi", line 154, in > pyarrow.lib.pyarrow_internal_check_status > # File "pyarrow\error.pxi", line 91, in pyarrow.lib.check_status > # OSError: ZSTD decompress failed: Unknown frame descriptor > > > Has anyone else tried this approach and found a workable solution to pull > individual objects from a (compressed) stream? > > > -- > Robert McLeod > robbmcl...@gmail.com > robert.mcl...@hitachi-hightech.com >
publickey - octalene.dev@pm.me - 0x21969656.asc
Description: application/pgp-keys
signature.asc
Description: OpenPGP digital signature