Hi!

Thanks for your suggestions!

> I'd prefer a unified one interface

I have updated the FLIP to take that into account. In this case, I would
also propose to completely drop `DuplicatingFileSystem` in favour of a
basically renamed version of it `PathsCopyingFileSystem`.
`DuplicatingFileSystem` was not marked as PublicEvolving/Experimental
(probably by mistake), so technically we can do it. Even if not for that
mistake, I would still vote to replace it to simplify the code, as any
migration would be very easy. At the same time to the best of my knowledge,
no one has ever implemented it.

> The proposal mentions that s5cmd utilises 100% of CPU similar to Flink
> 1.18. However, this will be a native process outside of the JVM. Are there
> risk of large/long state download starving the TM of CPU cycle causing
> issues such as heartbeat or ask timeout?
>
> Do you know if there is a way to limit the CPU utilisation of s5cmd? I see
> worker and concurrency configuration but these do not map directly to cap
> in CPU usage. The experience for feature user in this case will be one of
> trial and error.

Those are good points. As a matter of fact, shortly after publishing this
FLIP, we started experimenting with using `cpulimit` to achieve just that.
If everything will work out fine, we are planning to expose this as a
configuration option for the S3 file system. I've added that to the FLIP.

> 2. copyFiles is not an atomic operation, How could we handle the situation
> when some partial files fail ?
> Could we return the list of successful files then the caller could decide
> to retry or just know them ?

Do you have some suggestions on how that should be implemented in the
interface and how should it be used?

Note that for both recovery and checkpoints,  there are no retring
mechanisms. If any part of downloading or
uploading fails, the job fails over, so actually using such interface
extension would be out of scope of this FLIP. In
that case, maybe if this could be extended in the future without breaking
compatibility we could leave it as a
future improvement?

Best,
Piotrek


pt., 10 maj 2024 o 07:40 Hangxiang Yu <master...@gmail.com> napisał(a):

> Hi Piotr.
> Thanks for your proposal.
>
> I have some comments, PTAL:
> 1. +1 about unifying the interface with DuplicatingFileSystem.
> IIUC, DuplicatingFileSystem also covers the logic from/to both local and
> remote paths.
> The implementations could define their own logic about how to fast
> copy/duplicate files, e.g. hard link or transfer manager.
>
> 2. copyFiles is not an atomic operation, How could we handle the situation
> when some partial files fail ?
> Could we return the list of successful files then the caller could decide
> to retry or just know them ?
>
> On Thu, May 9, 2024 at 3:46 PM Keith Lee <leekeiabstract...@gmail.com>
> wrote:
>
> > Hi Piotr,
> >
> > Thank you for the proposal. Looks great.
> >
> > Along similar line of Aleks' question on memory usage.
> >
> > The proposal mentions that s5cmd utilises 100% of CPU similar to Flink
> > 1.18. However, this will be a native process outside of the JVM. Are
> there
> > risk of large/long state download starving the TM of CPU cycle causing
> > issues such as heartbeat or ask timeout?
> >
> > Do you know if there is a way to limit the CPU utilisation of s5cmd? I
> see
> > worker and concurrency configuration but these do not map directly to cap
> > in CPU usage. The experience for feature user in this case will be one of
> > trial and error.
> >
> > Thanks
> > Keith
> >
> > On Wed, May 8, 2024 at 12:47 PM Ahmed Hamdy <hamdy10...@gmail.com>
> wrote:
> >
> > > Hi Piotr
> > > +1 for the proposal, it seems to have a lot of gains.
> > >
> > > Best Regards
> > > Ahmed Hamdy
> > >
> > >
> > > On Mon, 6 May 2024 at 12:06, Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > >
> > > > Hi Piotrek,
> > > >
> > > > Thanks for your answers!
> > > >
> > > > Good question. The intention and use case behind
> > `DuplicatingFileSystem`
> > > is
> > > > > different. It marks if `FileSystem` can quickly copy/duplicate
> files
> > > > > in the remote `FileSystem`. For example an equivalent of a hard
> link
> > or
> > > > > bumping a reference count in the remote system. That's a bit
> > different
> > > > > to copy paths between remote and local file systems.
> > > > >
> > > > > However, it could arguably be unified under one interface where we
> > > would
> > > > > re-use or re-name `canFastDuplicate(Path, Path)` to
> > > > > `canFastCopy(Path, Path)` with the following use cases:
> > > > > - `canFastCopy(remoteA, remoteB)` returns true - current equivalent
> > of
> > > > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path
> > > > > - `canFastCopy(local, remote)` returns true - FS can natively
> upload
> > > > local
> > > > > file to a remote location
> > > > > - `canFastCopy(remote, local)` returns true - FS can natively
> > download
> > > > > local file from a remote location
> > > > >
> > > > > Maybe indeed that's a better solution vs having two separate
> > interfaces
> > > > for
> > > > > copying and duplicating?
> > > > >
> > > >
> > > > I'd prefer a unified one interface, `canFastCopy(Path, Path)` looks
> > good
> > > to
> > > > me. This also resolves my question 1 about the destination.
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Mon, May 6, 2024 at 6:36 PM Piotr Nowojski <pnowoj...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi All!
> > > > >
> > > > > Thanks for your comments.
> > > > >
> > > > > Muhammet and Hong, about the config options.
> > > > >
> > > > > > Could you please also add the configuration property for this? An
> > > > example
> > > > > showing how users would set this parameter would be helpful.
> > > > >
> > > > > > 1/ Configure the implementation of PathsCopyingFileSystem used
> > > > > > 2/ Configure the location of the s5cmd binary (version control
> > etc.)
> > > > >
> > > > > Ops, sorry I added the config options that I had in mind to the
> > FLIP. I
> > > > > don't know why I have omitted this. Basically I suggest that in
> order
> > > to
> > > > > use native file copying:
> > > > > 1. `FileSystem` must support it via implementing
> > > `PathsCopyingFileSystem`
> > > > > interface
> > > > > 2. That `FileSystem` would have to be configured to actually use
> it.
> > > For
> > > > > example S3 file system would return `true` that it can copy paths
> > > > >     only if `s3.s5cmd.path` has been specified.
> > > > >
> > > > > > Would this affect any filesystem connectors that use
> > FileSystem[1][2]
> > > > > dependencies?
> > > > >
> > > > > Definitely not out of the box. Any place in Flink that is currently
> > > > > uploading/downloading files from a FileSystem could use this
> feature,
> > > but
> > > > > it
> > > > > would have to be implemented. The same way this FLIP will implement
> > > > native
> > > > > files copying when downloading state during recovery,
> > > > > but the old code path will be still used for uploading state files
> > > > during a
> > > > > checkpoint.
> > > > >
> > > > > > How adding a s5cmd will affect memory footprint? Since this is a
> > > native
> > > > > binary, memory consumption will not be controlled by JVM or Flink.
> > > > >
> > > > > As you mentioned the memory usage of `s5cmd` will not be
> controlled,
> > so
> > > > the
> > > > > memory footprint will grow. S5cmd integration with Flink
> > > > > has been tested quite extensively on our production environment
> > > already,
> > > > > and we haven't observed any issues so far despite the fact we
> > > > > are using quite small pods. But of course if your setup is working
> on
> > > the
> > > > > edge of OOM, this could tip you over that edge.
> > > > >
> > > > > Zakelly:
> > > > >
> > > > > > 1. What is the semantic of `canCopyPath`? Should it be associated
> > > with
> > > > a
> > > > > > specific destination path? e.g. It can be copied to local, but
> not
> > to
> > > > the
> > > > > > remote FS.
> > > > >
> > > > > For the S3 (both for SDKv2 and s5cmd implementations), the copying
> > > > > direction (upload/download) doesn't matter. I don't know about
> other
> > > > > file systems, I haven't investigated anything besides S3.
> > Nevertheless
> > > I
> > > > > wouldn't worry too much about it, since we can start with the
> simple
> > > > > `canCopyPath` that handles both directions. If this will become
> > > important
> > > > > in the future, adding directional `canDownloadPath` or
> > `canUploadPath`
> > > > > would be a backward compatible change, so we can safely extend it
> in
> > > the
> > > > > future if needed.
> > > > >
> > > > > > 2. Is the existing interface `DuplicatingFileSystem`
> > feasible/enough
> > > > for
> > > > > this case?
> > > > >
> > > > > Good question. The intention and use case behind
> > > `DuplicatingFileSystem`
> > > > is
> > > > > different. It marks if `FileSystem` can quickly copy/duplicate
> files
> > > > > in the remote `FileSystem`. For example an equivalent of a hard
> link
> > or
> > > > > bumping a reference count in the remote system. That's a bit
> > different
> > > > > to copy paths between remote and local file systems.
> > > > >
> > > > > However, it could arguably be unified under one interface where we
> > > would
> > > > > re-use or re-name `canFastDuplicate(Path, Path)` to
> > > > > `canFastCopy(Path, Path)` with the following use cases:
> > > > > - `canFastCopy(remoteA, remoteB)` returns true - current equivalent
> > of
> > > > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path
> > > > > - `canFastCopy(local, remote)` returns true - FS can natively
> upload
> > > > local
> > > > > file to a remote location
> > > > > - `canFastCopy(remote, local)` returns true - FS can natively
> > download
> > > > > local file from a remote location
> > > > >
> > > > > Maybe indeed that's a better solution vs having two separate
> > interfaces
> > > > for
> > > > > copying and duplicating?
> > > > >
> > > > > > 3. Will the interface extracting introduce a break change?
> > > > >
> > > > > No. The signature of the existing abstract `FileSystem` class would
> > > > remain
> > > > > the same. Only most/all of the abstract methods would be
> > > > > pulled out to the interface and abstract `FileSystem` would
> implement
> > > > that
> > > > > new interface.
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > > pon., 6 maj 2024 o 04:55 Zakelly Lan <zakelly....@gmail.com>
> > > napisał(a):
> > > > >
> > > > > > Hi Piotr,
> > > > > >
> > > > > > Thanks for the proposal. It's meaningful to speed up the state
> > > > download.
> > > > > I
> > > > > > get into some questions:
> > > > > >
> > > > > > 1. What is the semantic of `canCopyPath`? Should it be associated
> > > with
> > > > a
> > > > > > specific destination path? e.g. It can be copied to local, but
> not
> > to
> > > > the
> > > > > > remote FS.
> > > > > > 2. Is the existing interface `DuplicatingFileSystem`
> > feasible/enough
> > > > for
> > > > > > this case?
> > > > > > 3. Will the interface extracting introduce a break change?
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Zakelly
> > > > > >
> > > > > >
> > > > > > On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko <
> > z3d...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Piotr,
> > > > > > >
> > > > > > > Thanks for the proposal.
> > > > > > > How adding a s5cmd will affect memory footprint? Since this is
> a
> > > > native
> > > > > > > binary, memory consumption will not be controlled by JVM or
> > Flink.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Aleksandr
> > > > > > >
> > > > > > > On Thu, 2 May 2024 at 11:12, Hong Liang <h...@apache.org>
> wrote:
> > > > > > >
> > > > > > > > Hi Piotr,
> > > > > > > >
> > > > > > > > Thanks for the FLIP! Nice to see work to improve the
> filesystem
> > > > > > > > performance. +1 to future work to improve the upload speed as
> > > well.
> > > > > > This
> > > > > > > > would be useful for jobs with large state and high Async
> > > > > checkpointing
> > > > > > > > times.
> > > > > > > >
> > > > > > > > Some thoughts on the configuration, it might be good for us
> to
> > > > > > introduce
> > > > > > > 2x
> > > > > > > > points of configurability for future proofing:
> > > > > > > > 1/ Configure the implementation of PathsCopyingFileSystem
> used,
> > > > maybe
> > > > > > by
> > > > > > > > config, or by ServiceResources (this would allow us to use
> this
> > > for
> > > > > > > > alternative clouds/Implement S3 SDKv2 support if we want this
> > in
> > > > the
> > > > > > > > future). Also this could be used as a feature flag to
> determine
> > > if
> > > > we
> > > > > > > > should be using this new native file copy support.
> > > > > > > > 2/ Configure the location of the s5cmd binary (version
> control
> > > > etc.),
> > > > > > as
> > > > > > > > you have mentioned in the FLIP.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Hong
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov
> > > > > > > > <mor+fl...@morazow.com.invalid> wrote:
> > > > > > > >
> > > > > > > > > Hey Piotr,
> > > > > > > > >
> > > > > > > > > Thanks for the proposal! It would be great improvement!
> > > > > > > > >
> > > > > > > > > Some questions from my side:
> > > > > > > > >
> > > > > > > > > > In order to configure s5cmd Flink’s user would need
> > > > > > > > > > to specify path to the s5cmd binary.
> > > > > > > > >
> > > > > > > > > Could you please also add the configuration property
> > > > > > > > > for this? An example showing how users would set this
> > > > > > > > > parameter would be helpful.
> > > > > > > > >
> > > > > > > > > Would this affect any filesystem connectors that use
> > > > > > > > > FileSystem[1][2] dependencies?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Muhammet
> > > > > > > > >
> > > > > > > > > [1]:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > > > > > > > [2]:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > > > > > > > >
> > > > > > > > > On 2024-04-30 13:15, Piotr Nowojski wrote:
> > > > > > > > > > Hi all!
> > > > > > > > > >
> > > > > > > > > > I would like to put under discussion:
> > > > > > > > > >
> > > > > > > > > > FLIP-444: Native file copy support
> > > > > > > > > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > > > > > > > >
> > > > > > > > > > This proposal aims to speed up Flink recovery times, by
> > > > speeding
> > > > > up
> > > > > > > > > > state
> > > > > > > > > > download times. However in the future, the same mechanism
> > > could
> > > > > be
> > > > > > > also
> > > > > > > > > > used to speed up state uploading
> > > (checkpointing/savepointing).
> > > > > > > > > >
> > > > > > > > > > I'm curious to hear your thoughts.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Piotrek
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Best,
> Hangxiang.
>

Reply via email to