I am trying to write multiple tables/tensors into a single stream/file. Writing is straightforward, and I can read everything back out, but every effort I have tried to pick an individual element out of a compressed stream has failed. E.g. I would like to only extract Tensor #1 from the stream. I provide some sample code:
import pyarrow as pa import pyarrow.ipc as ipc import numpy as np data = { 'column1': [1, 2, 3], 'column2': ['a', 'b', 'c'] } data2 = { 'column1': [7, 8, 9], 'column2': ['d', 'e', 'f'] } table = pa.table(data) table2 = pa.table(data2) tensor = pa.Tensor.from_numpy(np.ndarray([5,6,7])) tensor2 = pa.Tensor.from_numpy(np.ndarray([4,7,4,6,7,8])) sink = pa.BufferOutputStream() df_offset = sink.tell() # We are not using the context manager here because CompressedOutputStream closes # the the sink stream when it exits. compressed_sink = pa.CompressedOutputStream(sink, 'zstd') with ipc.RecordBatchStreamWriter(compressed_sink, table.schema) as table_writer: table_writer.write_table(table) compressed_sink.flush() df_size = sink.tell() - df_offset df2_offset = sink.tell() with ipc.RecordBatchStreamWriter(compressed_sink, table2.schema) as table_writer: table_writer.write_table(table2) compressed_sink.flush() df2_size = sink.tell() - df2_offset # Write our tensors tensor_offset = sink.tell() ipc.write_tensor(tensor, compressed_sink) compressed_sink.flush() tensor_size = sink.tell() - tensor_offset tensor_offset2 = sink.tell() ipc.write_tensor(tensor2, compressed_sink) compressed_sink.flush() tensor_size2 = sink.tell() - tensor_offset2 # Convert to bytes to finalize the sink buffer chunk = sink.getvalue() source = pa.BufferReader(chunk) # We can read out every element in the stream without any problems decompressed_source = pa.CompressedInputStream(source, 'zstd') with ipc.RecordBatchStreamReader(decompressed_source) as table_reader: read_table = table_reader.read_all() with ipc.RecordBatchStreamReader(decompressed_source) as table_reader2: read_table2 = table_reader2.read_all() read_tensor = ipc.read_tensor(decompressed_source) read_tensor2 = ipc.read_tensor(decompressed_source) # But, how can we go about reading only one object in the source buffer? block = chunk[tensor_offset:tensor_offset + tensor_size] decompressed2 = pa.CompressedInputStream(block, 'zstd') random_tensor = ipc.read_tensor(decompressed2) # Traceback (most recent call last): # File "<stdin>", line 1, in <module> # File "pyarrow\ipc.pxi", line 1257, in pyarrow.lib.read_tensor # File "pyarrow\error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status # File "pyarrow\error.pxi", line 91, in pyarrow.lib.check_status # OSError: ZSTD decompress failed: Unknown frame descriptor Has anyone else tried this approach and found a workable solution to pull individual objects from a (compressed) stream? -- Robert McLeod robbmcl...@gmail.com robert.mcl...@hitachi-hightech.com