> > Note that for both recovery and checkpoints, there are no retring > mechanisms. If any part of downloading or > uploading fails, the job fails over, so actually using such interface > extension would be out of scope of this FLIP. In > that case, maybe if this could be extended in the future without breaking > compatibility we could leave it as a > future improvement? >
Thanks for the reply. It makes sense to consider as a future optimization. On Fri, May 10, 2024 at 4:31 PM Piotr Nowojski <piotr.nowoj...@gmail.com> wrote: > Hi! > > Thanks for your suggestions! > > > I'd prefer a unified one interface > > I have updated the FLIP to take that into account. In this case, I would > also propose to completely drop `DuplicatingFileSystem` in favour of a > basically renamed version of it `PathsCopyingFileSystem`. > `DuplicatingFileSystem` was not marked as PublicEvolving/Experimental > (probably by mistake), so technically we can do it. Even if not for that > mistake, I would still vote to replace it to simplify the code, as any > migration would be very easy. At the same time to the best of my knowledge, > no one has ever implemented it. > > > The proposal mentions that s5cmd utilises 100% of CPU similar to Flink > > 1.18. However, this will be a native process outside of the JVM. Are > there > > risk of large/long state download starving the TM of CPU cycle causing > > issues such as heartbeat or ask timeout? > > > > Do you know if there is a way to limit the CPU utilisation of s5cmd? I > see > > worker and concurrency configuration but these do not map directly to cap > > in CPU usage. The experience for feature user in this case will be one of > > trial and error. > > Those are good points. As a matter of fact, shortly after publishing this > FLIP, we started experimenting with using `cpulimit` to achieve just that. > If everything will work out fine, we are planning to expose this as a > configuration option for the S3 file system. I've added that to the FLIP. > > > 2. copyFiles is not an atomic operation, How could we handle the > situation > > when some partial files fail ? > > Could we return the list of successful files then the caller could decide > > to retry or just know them ? > > Do you have some suggestions on how that should be implemented in the > interface and how should it be used? > > Note that for both recovery and checkpoints, there are no retring > mechanisms. If any part of downloading or > uploading fails, the job fails over, so actually using such interface > extension would be out of scope of this FLIP. In > that case, maybe if this could be extended in the future without breaking > compatibility we could leave it as a > future improvement? > > Best, > Piotrek > > > pt., 10 maj 2024 o 07:40 Hangxiang Yu <master...@gmail.com> napisał(a): > > > Hi Piotr. > > Thanks for your proposal. > > > > I have some comments, PTAL: > > 1. +1 about unifying the interface with DuplicatingFileSystem. > > IIUC, DuplicatingFileSystem also covers the logic from/to both local and > > remote paths. > > The implementations could define their own logic about how to fast > > copy/duplicate files, e.g. hard link or transfer manager. > > > > 2. copyFiles is not an atomic operation, How could we handle the > situation > > when some partial files fail ? > > Could we return the list of successful files then the caller could decide > > to retry or just know them ? > > > > On Thu, May 9, 2024 at 3:46 PM Keith Lee <leekeiabstract...@gmail.com> > > wrote: > > > > > Hi Piotr, > > > > > > Thank you for the proposal. Looks great. > > > > > > Along similar line of Aleks' question on memory usage. > > > > > > The proposal mentions that s5cmd utilises 100% of CPU similar to Flink > > > 1.18. However, this will be a native process outside of the JVM. Are > > there > > > risk of large/long state download starving the TM of CPU cycle causing > > > issues such as heartbeat or ask timeout? > > > > > > Do you know if there is a way to limit the CPU utilisation of s5cmd? I > > see > > > worker and concurrency configuration but these do not map directly to > cap > > > in CPU usage. The experience for feature user in this case will be one > of > > > trial and error. > > > > > > Thanks > > > Keith > > > > > > On Wed, May 8, 2024 at 12:47 PM Ahmed Hamdy <hamdy10...@gmail.com> > > wrote: > > > > > > > Hi Piotr > > > > +1 for the proposal, it seems to have a lot of gains. > > > > > > > > Best Regards > > > > Ahmed Hamdy > > > > > > > > > > > > On Mon, 6 May 2024 at 12:06, Zakelly Lan <zakelly....@gmail.com> > > wrote: > > > > > > > > > Hi Piotrek, > > > > > > > > > > Thanks for your answers! > > > > > > > > > > Good question. The intention and use case behind > > > `DuplicatingFileSystem` > > > > is > > > > > > different. It marks if `FileSystem` can quickly copy/duplicate > > files > > > > > > in the remote `FileSystem`. For example an equivalent of a hard > > link > > > or > > > > > > bumping a reference count in the remote system. That's a bit > > > different > > > > > > to copy paths between remote and local file systems. > > > > > > > > > > > > However, it could arguably be unified under one interface where > we > > > > would > > > > > > re-use or re-name `canFastDuplicate(Path, Path)` to > > > > > > `canFastCopy(Path, Path)` with the following use cases: > > > > > > - `canFastCopy(remoteA, remoteB)` returns true - current > equivalent > > > of > > > > > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path > > > > > > - `canFastCopy(local, remote)` returns true - FS can natively > > upload > > > > > local > > > > > > file to a remote location > > > > > > - `canFastCopy(remote, local)` returns true - FS can natively > > > download > > > > > > local file from a remote location > > > > > > > > > > > > Maybe indeed that's a better solution vs having two separate > > > interfaces > > > > > for > > > > > > copying and duplicating? > > > > > > > > > > > > > > > > I'd prefer a unified one interface, `canFastCopy(Path, Path)` looks > > > good > > > > to > > > > > me. This also resolves my question 1 about the destination. > > > > > > > > > > > > > > > Best, > > > > > Zakelly > > > > > > > > > > On Mon, May 6, 2024 at 6:36 PM Piotr Nowojski < > pnowoj...@apache.org> > > > > > wrote: > > > > > > > > > > > Hi All! > > > > > > > > > > > > Thanks for your comments. > > > > > > > > > > > > Muhammet and Hong, about the config options. > > > > > > > > > > > > > Could you please also add the configuration property for this? > An > > > > > example > > > > > > showing how users would set this parameter would be helpful. > > > > > > > > > > > > > 1/ Configure the implementation of PathsCopyingFileSystem used > > > > > > > 2/ Configure the location of the s5cmd binary (version control > > > etc.) > > > > > > > > > > > > Ops, sorry I added the config options that I had in mind to the > > > FLIP. I > > > > > > don't know why I have omitted this. Basically I suggest that in > > order > > > > to > > > > > > use native file copying: > > > > > > 1. `FileSystem` must support it via implementing > > > > `PathsCopyingFileSystem` > > > > > > interface > > > > > > 2. That `FileSystem` would have to be configured to actually use > > it. > > > > For > > > > > > example S3 file system would return `true` that it can copy paths > > > > > > only if `s3.s5cmd.path` has been specified. > > > > > > > > > > > > > Would this affect any filesystem connectors that use > > > FileSystem[1][2] > > > > > > dependencies? > > > > > > > > > > > > Definitely not out of the box. Any place in Flink that is > currently > > > > > > uploading/downloading files from a FileSystem could use this > > feature, > > > > but > > > > > > it > > > > > > would have to be implemented. The same way this FLIP will > implement > > > > > native > > > > > > files copying when downloading state during recovery, > > > > > > but the old code path will be still used for uploading state > files > > > > > during a > > > > > > checkpoint. > > > > > > > > > > > > > How adding a s5cmd will affect memory footprint? Since this is > a > > > > native > > > > > > binary, memory consumption will not be controlled by JVM or > Flink. > > > > > > > > > > > > As you mentioned the memory usage of `s5cmd` will not be > > controlled, > > > so > > > > > the > > > > > > memory footprint will grow. S5cmd integration with Flink > > > > > > has been tested quite extensively on our production environment > > > > already, > > > > > > and we haven't observed any issues so far despite the fact we > > > > > > are using quite small pods. But of course if your setup is > working > > on > > > > the > > > > > > edge of OOM, this could tip you over that edge. > > > > > > > > > > > > Zakelly: > > > > > > > > > > > > > 1. What is the semantic of `canCopyPath`? Should it be > associated > > > > with > > > > > a > > > > > > > specific destination path? e.g. It can be copied to local, but > > not > > > to > > > > > the > > > > > > > remote FS. > > > > > > > > > > > > For the S3 (both for SDKv2 and s5cmd implementations), the > copying > > > > > > direction (upload/download) doesn't matter. I don't know about > > other > > > > > > file systems, I haven't investigated anything besides S3. > > > Nevertheless > > > > I > > > > > > wouldn't worry too much about it, since we can start with the > > simple > > > > > > `canCopyPath` that handles both directions. If this will become > > > > important > > > > > > in the future, adding directional `canDownloadPath` or > > > `canUploadPath` > > > > > > would be a backward compatible change, so we can safely extend it > > in > > > > the > > > > > > future if needed. > > > > > > > > > > > > > 2. Is the existing interface `DuplicatingFileSystem` > > > feasible/enough > > > > > for > > > > > > this case? > > > > > > > > > > > > Good question. The intention and use case behind > > > > `DuplicatingFileSystem` > > > > > is > > > > > > different. It marks if `FileSystem` can quickly copy/duplicate > > files > > > > > > in the remote `FileSystem`. For example an equivalent of a hard > > link > > > or > > > > > > bumping a reference count in the remote system. That's a bit > > > different > > > > > > to copy paths between remote and local file systems. > > > > > > > > > > > > However, it could arguably be unified under one interface where > we > > > > would > > > > > > re-use or re-name `canFastDuplicate(Path, Path)` to > > > > > > `canFastCopy(Path, Path)` with the following use cases: > > > > > > - `canFastCopy(remoteA, remoteB)` returns true - current > equivalent > > > of > > > > > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path > > > > > > - `canFastCopy(local, remote)` returns true - FS can natively > > upload > > > > > local > > > > > > file to a remote location > > > > > > - `canFastCopy(remote, local)` returns true - FS can natively > > > download > > > > > > local file from a remote location > > > > > > > > > > > > Maybe indeed that's a better solution vs having two separate > > > interfaces > > > > > for > > > > > > copying and duplicating? > > > > > > > > > > > > > 3. Will the interface extracting introduce a break change? > > > > > > > > > > > > No. The signature of the existing abstract `FileSystem` class > would > > > > > remain > > > > > > the same. Only most/all of the abstract methods would be > > > > > > pulled out to the interface and abstract `FileSystem` would > > implement > > > > > that > > > > > > new interface. > > > > > > > > > > > > Best, > > > > > > Piotrek > > > > > > > > > > > > pon., 6 maj 2024 o 04:55 Zakelly Lan <zakelly....@gmail.com> > > > > napisał(a): > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > Thanks for the proposal. It's meaningful to speed up the state > > > > > download. > > > > > > I > > > > > > > get into some questions: > > > > > > > > > > > > > > 1. What is the semantic of `canCopyPath`? Should it be > associated > > > > with > > > > > a > > > > > > > specific destination path? e.g. It can be copied to local, but > > not > > > to > > > > > the > > > > > > > remote FS. > > > > > > > 2. Is the existing interface `DuplicatingFileSystem` > > > feasible/enough > > > > > for > > > > > > > this case? > > > > > > > 3. Will the interface extracting introduce a break change? > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko < > > > z3d...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > Thanks for the proposal. > > > > > > > > How adding a s5cmd will affect memory footprint? Since this > is > > a > > > > > native > > > > > > > > binary, memory consumption will not be controlled by JVM or > > > Flink. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Aleksandr > > > > > > > > > > > > > > > > On Thu, 2 May 2024 at 11:12, Hong Liang <h...@apache.org> > > wrote: > > > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > > > Thanks for the FLIP! Nice to see work to improve the > > filesystem > > > > > > > > > performance. +1 to future work to improve the upload speed > as > > > > well. > > > > > > > This > > > > > > > > > would be useful for jobs with large state and high Async > > > > > > checkpointing > > > > > > > > > times. > > > > > > > > > > > > > > > > > > Some thoughts on the configuration, it might be good for us > > to > > > > > > > introduce > > > > > > > > 2x > > > > > > > > > points of configurability for future proofing: > > > > > > > > > 1/ Configure the implementation of PathsCopyingFileSystem > > used, > > > > > maybe > > > > > > > by > > > > > > > > > config, or by ServiceResources (this would allow us to use > > this > > > > for > > > > > > > > > alternative clouds/Implement S3 SDKv2 support if we want > this > > > in > > > > > the > > > > > > > > > future). Also this could be used as a feature flag to > > determine > > > > if > > > > > we > > > > > > > > > should be using this new native file copy support. > > > > > > > > > 2/ Configure the location of the s5cmd binary (version > > control > > > > > etc.), > > > > > > > as > > > > > > > > > you have mentioned in the FLIP. > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Hong > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov > > > > > > > > > <mor+fl...@morazow.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > Hey Piotr, > > > > > > > > > > > > > > > > > > > > Thanks for the proposal! It would be great improvement! > > > > > > > > > > > > > > > > > > > > Some questions from my side: > > > > > > > > > > > > > > > > > > > > > In order to configure s5cmd Flink’s user would need > > > > > > > > > > > to specify path to the s5cmd binary. > > > > > > > > > > > > > > > > > > > > Could you please also add the configuration property > > > > > > > > > > for this? An example showing how users would set this > > > > > > > > > > parameter would be helpful. > > > > > > > > > > > > > > > > > > > > Would this affect any filesystem connectors that use > > > > > > > > > > FileSystem[1][2] dependencies? > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Muhammet > > > > > > > > > > > > > > > > > > > > [1]: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/ > > > > > > > > > > [2]: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/ > > > > > > > > > > > > > > > > > > > > On 2024-04-30 13:15, Piotr Nowojski wrote: > > > > > > > > > > > Hi all! > > > > > > > > > > > > > > > > > > > > > > I would like to put under discussion: > > > > > > > > > > > > > > > > > > > > > > FLIP-444: Native file copy support > > > > > > > > > > > https://cwiki.apache.org/confluence/x/rAn9EQ > > > > > > > > > > > > > > > > > > > > > > This proposal aims to speed up Flink recovery times, by > > > > > speeding > > > > > > up > > > > > > > > > > > state > > > > > > > > > > > download times. However in the future, the same > mechanism > > > > could > > > > > > be > > > > > > > > also > > > > > > > > > > > used to speed up state uploading > > > > (checkpointing/savepointing). > > > > > > > > > > > > > > > > > > > > > > I'm curious to hear your thoughts. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Piotrek > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Best, > > Hangxiang. > > > -- Best, Hangxiang.