I am trying to write multiple tables/tensors into a single stream/file.
Writing is straightforward, and I can read everything back out, but every
effort I have tried to pick an individual element out of a compressed
stream has failed. E.g. I would like to only extract Tensor #1 from the
stream. I provide some sample code:

import pyarrow as pa
import pyarrow.ipc as ipc
import numpy as np

data = {
    'column1': [1, 2, 3],
    'column2': ['a', 'b', 'c']
}
data2 = {
    'column1': [7, 8, 9],
    'column2': ['d', 'e', 'f']
}

table = pa.table(data)
table2 = pa.table(data2)

tensor = pa.Tensor.from_numpy(np.ndarray([5,6,7]))
tensor2 = pa.Tensor.from_numpy(np.ndarray([4,7,4,6,7,8]))

sink = pa.BufferOutputStream()

df_offset = sink.tell()
# We are not using the context manager here because CompressedOutputStream
closes
# the the sink stream when it exits.
compressed_sink = pa.CompressedOutputStream(sink, 'zstd')
with ipc.RecordBatchStreamWriter(compressed_sink, table.schema) as
table_writer:
    table_writer.write_table(table)
compressed_sink.flush()
df_size = sink.tell() - df_offset

df2_offset = sink.tell()
with ipc.RecordBatchStreamWriter(compressed_sink, table2.schema) as
table_writer:
    table_writer.write_table(table2)
compressed_sink.flush()
df2_size = sink.tell() - df2_offset

# Write our tensors
tensor_offset = sink.tell()
ipc.write_tensor(tensor, compressed_sink)
compressed_sink.flush()
tensor_size = sink.tell() - tensor_offset

tensor_offset2 = sink.tell()
ipc.write_tensor(tensor2, compressed_sink)
compressed_sink.flush()
tensor_size2 = sink.tell() - tensor_offset2

# Convert to bytes to finalize the sink buffer
chunk = sink.getvalue()

source = pa.BufferReader(chunk)
# We can read out every element in the stream without any problems
decompressed_source =  pa.CompressedInputStream(source, 'zstd')
with ipc.RecordBatchStreamReader(decompressed_source) as table_reader:
    read_table = table_reader.read_all()
with ipc.RecordBatchStreamReader(decompressed_source) as table_reader2:
    read_table2 = table_reader2.read_all()

read_tensor = ipc.read_tensor(decompressed_source)
read_tensor2 = ipc.read_tensor(decompressed_source)

# But, how can we go about reading only one object in the source buffer?
block = chunk[tensor_offset:tensor_offset + tensor_size]
decompressed2 = pa.CompressedInputStream(block, 'zstd')
random_tensor = ipc.read_tensor(decompressed2)
# Traceback (most recent call last):
#   File "<stdin>", line 1, in <module>
#   File "pyarrow\ipc.pxi", line 1257, in pyarrow.lib.read_tensor
#   File "pyarrow\error.pxi", line 154, in
pyarrow.lib.pyarrow_internal_check_status
#   File "pyarrow\error.pxi", line 91, in pyarrow.lib.check_status
# OSError: ZSTD decompress failed: Unknown frame descriptor

Has anyone else tried this approach and found a workable solution to pull
individual objects from a (compressed) stream?

-- 
Robert McLeod
robbmcl...@gmail.com
robert.mcl...@hitachi-hightech.com

Reply via email to