On Wed, Aug 14, 2019 at 7:19 AM Oliver Laslett <[email protected]> wrote:

> What is the correct way to implement a custom non-splittable file parser
> in python?
>
> My desired end-state is: 1) use Read to pass a file pattern (with wild
> cards) pointing to several XML files on remote storage (S3 or GCS). 2) each
> file is parsed as a single element (XML cannot be processed line-by-line)
> resulting in a PCollection. 3) combine all PCollections together.
>
> I've subclassed FileBasedSource, which seems to give me everything out of
> the box. However I have a problem with zipped files.
> The self.open_file(fname) method returns a file object. For non-compressed
> files I can call self.open_file(fname).read(). But for compressed files I
> have a missing argument error and must provide the number of bytes to read:
> self.open_file(fname).read(num_bytes).
>
> Is it possible to implement a FileBasedSource that works generically for
> compressed and non-compressed non-splittable files?
>

It should be possible. I'm not sure what your issue was
though. self.open_file() should return a file-like object (a CompressedFile
object if you specified a compression type). In your read_records()
implementation, you are expected to read bytes from this file (not all
bytes have to be read in a single call) and produce an iterator for reading
records.

If your files are non splittable though, FileBasedSource does not add much
value. I suggest also looking into fileio.MatchAll  transform and
implementing your source as a composite that uses fileio.MatchAll followed
by a ParDo that produces records. You can use Beam's filesystems
abstraction in your ParDo to get easy access to all filesystems supported
by Beam.

Thanks,
Cham

Reply via email to