Thanks everyone for your inputs here! Really helpful information!

From: Chamikara Jayalath <chamik...@google.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Thursday, January 28, 2021 at 10:54 AM
To: user <user@beam.apache.org>
Subject: Re: Overwrite support from ParquetIO



On Thu, Jan 28, 2021 at 9:14 AM Alexey Romanenko 
<aromanenko....@gmail.com<mailto:aromanenko....@gmail.com>> wrote:
1. Personally, I’d recommend to purge the output directory (if it’s needed, of 
course) before starting your pipeline as a part of your driver program and not 
in DoFn since, as Reuven mentioned before, to avoid potential side effects. 
Another option is to write files into the new directory with uniq name and 
then, after your pipeline has been finished, atomically rename it. Though, of 
course the final solution depends on internals of your application and 
environment.

Imho, FS manipulations (like this) should be a part of driver program and not a 
distributed data processing pipeline where it can be quite tricky to do 
reliably.

2. Yes, for sure we can’t rely on the fact that the old files will be 
overwritten by new files. Even more, we need to make sure that they won’t be 
overwritten to guarantee that we won’t lose them unexpectedly.

+1. Also note that due to dynamic  work rebalancing, file names might not 
exactly match only the prefix will match. So the two runs of the same pipeline, 
even on the same input, might produce a different number of shards (hence a 
different number of filenames with the same prefix).



On 27 Jan 2021, at 21:06, Tao Li <t...@zillow.com<mailto:t...@zillow.com>> 
wrote:

@Alexey Romanenko<mailto:aromanenko....@gmail.com> thanks for your response. 
Regarding your questions:


  1.  Yes I can purge this directory (e.g. using s3 client from aws sdk) before 
using ParquetIO to save files. The caveat is that this deletion operation is 
not part of the beam pipeline, so it will kick off before the pipeline starts. 
More ideally, this purge operation could be baked into the write operation with 
ParquetIO so we will have the deletion happen right before the files writes.
  2.  Regarding the naming strategy, yes the old files will be overwritten by 
the new files if they have the same file names. However this does not always 
guarantee that all the old files in this directory are wiped out (which is 
actually my requirement). For example we may change the shard count (through 
withNumShards() method) in different pipeline runs and there could be old files 
from previous run that won’t get overwritten in the current run.

Please let me know if this makes sense to you. Thanks!


From: Alexey Romanenko 
<aromanenko....@gmail.com<mailto:aromanenko....@gmail.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, January 27, 2021 at 9:10 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Overwrite support from ParquetIO

What do you mean by “wipe out all existing parquet files before a write 
operation”? Are these all files that already exist in the same output 
directory? Can you purge this directory before or just use a new output 
directory for every pipeline run?

To write Parquet files you need to use ParquetIO.sink() with FileIO.write() and 
I don’t think it will clean up the output directory before write. Though, if 
there are the name collisions between existing and new output files (it depends 
on used naming strategy) then I think the old files will be overwritten by new 
ones.



On 25 Jan 2021, at 19:10, Tao Li <t...@zillow.com<mailto:t...@zillow.com>> 
wrote:

Hi Beam community,

Does ParquetIO support an overwrite behavior when saving files? More 
specifically, I would like to wipe out all existing parquet files before a 
write operation. Is there a ParquetIO API to support that? Thanks!

Reply via email to