Hi All! Thanks for your comments.
Muhammet and Hong, about the config options. > Could you please also add the configuration property for this? An example showing how users would set this parameter would be helpful. > 1/ Configure the implementation of PathsCopyingFileSystem used > 2/ Configure the location of the s5cmd binary (version control etc.) Ops, sorry I added the config options that I had in mind to the FLIP. I don't know why I have omitted this. Basically I suggest that in order to use native file copying: 1. `FileSystem` must support it via implementing `PathsCopyingFileSystem` interface 2. That `FileSystem` would have to be configured to actually use it. For example S3 file system would return `true` that it can copy paths only if `s3.s5cmd.path` has been specified. > Would this affect any filesystem connectors that use FileSystem[1][2] dependencies? Definitely not out of the box. Any place in Flink that is currently uploading/downloading files from a FileSystem could use this feature, but it would have to be implemented. The same way this FLIP will implement native files copying when downloading state during recovery, but the old code path will be still used for uploading state files during a checkpoint. > How adding a s5cmd will affect memory footprint? Since this is a native binary, memory consumption will not be controlled by JVM or Flink. As you mentioned the memory usage of `s5cmd` will not be controlled, so the memory footprint will grow. S5cmd integration with Flink has been tested quite extensively on our production environment already, and we haven't observed any issues so far despite the fact we are using quite small pods. But of course if your setup is working on the edge of OOM, this could tip you over that edge. Zakelly: > 1. What is the semantic of `canCopyPath`? Should it be associated with a > specific destination path? e.g. It can be copied to local, but not to the > remote FS. For the S3 (both for SDKv2 and s5cmd implementations), the copying direction (upload/download) doesn't matter. I don't know about other file systems, I haven't investigated anything besides S3. Nevertheless I wouldn't worry too much about it, since we can start with the simple `canCopyPath` that handles both directions. If this will become important in the future, adding directional `canDownloadPath` or `canUploadPath` would be a backward compatible change, so we can safely extend it in the future if needed. > 2. Is the existing interface `DuplicatingFileSystem` feasible/enough for this case? Good question. The intention and use case behind `DuplicatingFileSystem` is different. It marks if `FileSystem` can quickly copy/duplicate files in the remote `FileSystem`. For example an equivalent of a hard link or bumping a reference count in the remote system. That's a bit different to copy paths between remote and local file systems. However, it could arguably be unified under one interface where we would re-use or re-name `canFastDuplicate(Path, Path)` to `canFastCopy(Path, Path)` with the following use cases: - `canFastCopy(remoteA, remoteB)` returns true - current equivalent of `DuplicatingFileSystem` - quickly duplicate/hard link remote path - `canFastCopy(local, remote)` returns true - FS can natively upload local file to a remote location - `canFastCopy(remote, local)` returns true - FS can natively download local file from a remote location Maybe indeed that's a better solution vs having two separate interfaces for copying and duplicating? > 3. Will the interface extracting introduce a break change? No. The signature of the existing abstract `FileSystem` class would remain the same. Only most/all of the abstract methods would be pulled out to the interface and abstract `FileSystem` would implement that new interface. Best, Piotrek pon., 6 maj 2024 o 04:55 Zakelly Lan <zakelly....@gmail.com> napisał(a): > Hi Piotr, > > Thanks for the proposal. It's meaningful to speed up the state download. I > get into some questions: > > 1. What is the semantic of `canCopyPath`? Should it be associated with a > specific destination path? e.g. It can be copied to local, but not to the > remote FS. > 2. Is the existing interface `DuplicatingFileSystem` feasible/enough for > this case? > 3. Will the interface extracting introduce a break change? > > > Best, > Zakelly > > > On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko <z3d...@gmail.com> > wrote: > > > Hi Piotr, > > > > Thanks for the proposal. > > How adding a s5cmd will affect memory footprint? Since this is a native > > binary, memory consumption will not be controlled by JVM or Flink. > > > > Thanks, > > Aleksandr > > > > On Thu, 2 May 2024 at 11:12, Hong Liang <h...@apache.org> wrote: > > > > > Hi Piotr, > > > > > > Thanks for the FLIP! Nice to see work to improve the filesystem > > > performance. +1 to future work to improve the upload speed as well. > This > > > would be useful for jobs with large state and high Async checkpointing > > > times. > > > > > > Some thoughts on the configuration, it might be good for us to > introduce > > 2x > > > points of configurability for future proofing: > > > 1/ Configure the implementation of PathsCopyingFileSystem used, maybe > by > > > config, or by ServiceResources (this would allow us to use this for > > > alternative clouds/Implement S3 SDKv2 support if we want this in the > > > future). Also this could be used as a feature flag to determine if we > > > should be using this new native file copy support. > > > 2/ Configure the location of the s5cmd binary (version control etc.), > as > > > you have mentioned in the FLIP. > > > > > > Regards, > > > Hong > > > > > > > > > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov > > > <mor+fl...@morazow.com.invalid> wrote: > > > > > > > Hey Piotr, > > > > > > > > Thanks for the proposal! It would be great improvement! > > > > > > > > Some questions from my side: > > > > > > > > > In order to configure s5cmd Flink’s user would need > > > > > to specify path to the s5cmd binary. > > > > > > > > Could you please also add the configuration property > > > > for this? An example showing how users would set this > > > > parameter would be helpful. > > > > > > > > Would this affect any filesystem connectors that use > > > > FileSystem[1][2] dependencies? > > > > > > > > Best, > > > > Muhammet > > > > > > > > [1]: > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/ > > > > [2]: > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/ > > > > > > > > On 2024-04-30 13:15, Piotr Nowojski wrote: > > > > > Hi all! > > > > > > > > > > I would like to put under discussion: > > > > > > > > > > FLIP-444: Native file copy support > > > > > https://cwiki.apache.org/confluence/x/rAn9EQ > > > > > > > > > > This proposal aims to speed up Flink recovery times, by speeding up > > > > > state > > > > > download times. However in the future, the same mechanism could be > > also > > > > > used to speed up state uploading (checkpointing/savepointing). > > > > > > > > > > I'm curious to hear your thoughts. > > > > > > > > > > Best, > > > > > Piotrek > > > > > > > > > >