>
> Note that for both recovery and checkpoints,  there are no retring
> mechanisms. If any part of downloading or
> uploading fails, the job fails over, so actually using such interface
> extension would be out of scope of this FLIP. In
> that case, maybe if this could be extended in the future without breaking
> compatibility we could leave it as a
> future improvement?
>

Thanks for the reply.
It makes sense to consider as a future optimization.

On Fri, May 10, 2024 at 4:31 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi!
>
> Thanks for your suggestions!
>
> > I'd prefer a unified one interface
>
> I have updated the FLIP to take that into account. In this case, I would
> also propose to completely drop `DuplicatingFileSystem` in favour of a
> basically renamed version of it `PathsCopyingFileSystem`.
> `DuplicatingFileSystem` was not marked as PublicEvolving/Experimental
> (probably by mistake), so technically we can do it. Even if not for that
> mistake, I would still vote to replace it to simplify the code, as any
> migration would be very easy. At the same time to the best of my knowledge,
> no one has ever implemented it.
>
> > The proposal mentions that s5cmd utilises 100% of CPU similar to Flink
> > 1.18. However, this will be a native process outside of the JVM. Are
> there
> > risk of large/long state download starving the TM of CPU cycle causing
> > issues such as heartbeat or ask timeout?
> >
> > Do you know if there is a way to limit the CPU utilisation of s5cmd? I
> see
> > worker and concurrency configuration but these do not map directly to cap
> > in CPU usage. The experience for feature user in this case will be one of
> > trial and error.
>
> Those are good points. As a matter of fact, shortly after publishing this
> FLIP, we started experimenting with using `cpulimit` to achieve just that.
> If everything will work out fine, we are planning to expose this as a
> configuration option for the S3 file system. I've added that to the FLIP.
>
> > 2. copyFiles is not an atomic operation, How could we handle the
> situation
> > when some partial files fail ?
> > Could we return the list of successful files then the caller could decide
> > to retry or just know them ?
>
> Do you have some suggestions on how that should be implemented in the
> interface and how should it be used?
>
> Note that for both recovery and checkpoints,  there are no retring
> mechanisms. If any part of downloading or
> uploading fails, the job fails over, so actually using such interface
> extension would be out of scope of this FLIP. In
> that case, maybe if this could be extended in the future without breaking
> compatibility we could leave it as a
> future improvement?
>
> Best,
> Piotrek
>
>
> pt., 10 maj 2024 o 07:40 Hangxiang Yu <master...@gmail.com> napisał(a):
>
> > Hi Piotr.
> > Thanks for your proposal.
> >
> > I have some comments, PTAL:
> > 1. +1 about unifying the interface with DuplicatingFileSystem.
> > IIUC, DuplicatingFileSystem also covers the logic from/to both local and
> > remote paths.
> > The implementations could define their own logic about how to fast
> > copy/duplicate files, e.g. hard link or transfer manager.
> >
> > 2. copyFiles is not an atomic operation, How could we handle the
> situation
> > when some partial files fail ?
> > Could we return the list of successful files then the caller could decide
> > to retry or just know them ?
> >
> > On Thu, May 9, 2024 at 3:46 PM Keith Lee <leekeiabstract...@gmail.com>
> > wrote:
> >
> > > Hi Piotr,
> > >
> > > Thank you for the proposal. Looks great.
> > >
> > > Along similar line of Aleks' question on memory usage.
> > >
> > > The proposal mentions that s5cmd utilises 100% of CPU similar to Flink
> > > 1.18. However, this will be a native process outside of the JVM. Are
> > there
> > > risk of large/long state download starving the TM of CPU cycle causing
> > > issues such as heartbeat or ask timeout?
> > >
> > > Do you know if there is a way to limit the CPU utilisation of s5cmd? I
> > see
> > > worker and concurrency configuration but these do not map directly to
> cap
> > > in CPU usage. The experience for feature user in this case will be one
> of
> > > trial and error.
> > >
> > > Thanks
> > > Keith
> > >
> > > On Wed, May 8, 2024 at 12:47 PM Ahmed Hamdy <hamdy10...@gmail.com>
> > wrote:
> > >
> > > > Hi Piotr
> > > > +1 for the proposal, it seems to have a lot of gains.
> > > >
> > > > Best Regards
> > > > Ahmed Hamdy
> > > >
> > > >
> > > > On Mon, 6 May 2024 at 12:06, Zakelly Lan <zakelly....@gmail.com>
> > wrote:
> > > >
> > > > > Hi Piotrek,
> > > > >
> > > > > Thanks for your answers!
> > > > >
> > > > > Good question. The intention and use case behind
> > > `DuplicatingFileSystem`
> > > > is
> > > > > > different. It marks if `FileSystem` can quickly copy/duplicate
> > files
> > > > > > in the remote `FileSystem`. For example an equivalent of a hard
> > link
> > > or
> > > > > > bumping a reference count in the remote system. That's a bit
> > > different
> > > > > > to copy paths between remote and local file systems.
> > > > > >
> > > > > > However, it could arguably be unified under one interface where
> we
> > > > would
> > > > > > re-use or re-name `canFastDuplicate(Path, Path)` to
> > > > > > `canFastCopy(Path, Path)` with the following use cases:
> > > > > > - `canFastCopy(remoteA, remoteB)` returns true - current
> equivalent
> > > of
> > > > > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path
> > > > > > - `canFastCopy(local, remote)` returns true - FS can natively
> > upload
> > > > > local
> > > > > > file to a remote location
> > > > > > - `canFastCopy(remote, local)` returns true - FS can natively
> > > download
> > > > > > local file from a remote location
> > > > > >
> > > > > > Maybe indeed that's a better solution vs having two separate
> > > interfaces
> > > > > for
> > > > > > copying and duplicating?
> > > > > >
> > > > >
> > > > > I'd prefer a unified one interface, `canFastCopy(Path, Path)` looks
> > > good
> > > > to
> > > > > me. This also resolves my question 1 about the destination.
> > > > >
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > > >
> > > > > On Mon, May 6, 2024 at 6:36 PM Piotr Nowojski <
> pnowoj...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi All!
> > > > > >
> > > > > > Thanks for your comments.
> > > > > >
> > > > > > Muhammet and Hong, about the config options.
> > > > > >
> > > > > > > Could you please also add the configuration property for this?
> An
> > > > > example
> > > > > > showing how users would set this parameter would be helpful.
> > > > > >
> > > > > > > 1/ Configure the implementation of PathsCopyingFileSystem used
> > > > > > > 2/ Configure the location of the s5cmd binary (version control
> > > etc.)
> > > > > >
> > > > > > Ops, sorry I added the config options that I had in mind to the
> > > FLIP. I
> > > > > > don't know why I have omitted this. Basically I suggest that in
> > order
> > > > to
> > > > > > use native file copying:
> > > > > > 1. `FileSystem` must support it via implementing
> > > > `PathsCopyingFileSystem`
> > > > > > interface
> > > > > > 2. That `FileSystem` would have to be configured to actually use
> > it.
> > > > For
> > > > > > example S3 file system would return `true` that it can copy paths
> > > > > >     only if `s3.s5cmd.path` has been specified.
> > > > > >
> > > > > > > Would this affect any filesystem connectors that use
> > > FileSystem[1][2]
> > > > > > dependencies?
> > > > > >
> > > > > > Definitely not out of the box. Any place in Flink that is
> currently
> > > > > > uploading/downloading files from a FileSystem could use this
> > feature,
> > > > but
> > > > > > it
> > > > > > would have to be implemented. The same way this FLIP will
> implement
> > > > > native
> > > > > > files copying when downloading state during recovery,
> > > > > > but the old code path will be still used for uploading state
> files
> > > > > during a
> > > > > > checkpoint.
> > > > > >
> > > > > > > How adding a s5cmd will affect memory footprint? Since this is
> a
> > > > native
> > > > > > binary, memory consumption will not be controlled by JVM or
> Flink.
> > > > > >
> > > > > > As you mentioned the memory usage of `s5cmd` will not be
> > controlled,
> > > so
> > > > > the
> > > > > > memory footprint will grow. S5cmd integration with Flink
> > > > > > has been tested quite extensively on our production environment
> > > > already,
> > > > > > and we haven't observed any issues so far despite the fact we
> > > > > > are using quite small pods. But of course if your setup is
> working
> > on
> > > > the
> > > > > > edge of OOM, this could tip you over that edge.
> > > > > >
> > > > > > Zakelly:
> > > > > >
> > > > > > > 1. What is the semantic of `canCopyPath`? Should it be
> associated
> > > > with
> > > > > a
> > > > > > > specific destination path? e.g. It can be copied to local, but
> > not
> > > to
> > > > > the
> > > > > > > remote FS.
> > > > > >
> > > > > > For the S3 (both for SDKv2 and s5cmd implementations), the
> copying
> > > > > > direction (upload/download) doesn't matter. I don't know about
> > other
> > > > > > file systems, I haven't investigated anything besides S3.
> > > Nevertheless
> > > > I
> > > > > > wouldn't worry too much about it, since we can start with the
> > simple
> > > > > > `canCopyPath` that handles both directions. If this will become
> > > > important
> > > > > > in the future, adding directional `canDownloadPath` or
> > > `canUploadPath`
> > > > > > would be a backward compatible change, so we can safely extend it
> > in
> > > > the
> > > > > > future if needed.
> > > > > >
> > > > > > > 2. Is the existing interface `DuplicatingFileSystem`
> > > feasible/enough
> > > > > for
> > > > > > this case?
> > > > > >
> > > > > > Good question. The intention and use case behind
> > > > `DuplicatingFileSystem`
> > > > > is
> > > > > > different. It marks if `FileSystem` can quickly copy/duplicate
> > files
> > > > > > in the remote `FileSystem`. For example an equivalent of a hard
> > link
> > > or
> > > > > > bumping a reference count in the remote system. That's a bit
> > > different
> > > > > > to copy paths between remote and local file systems.
> > > > > >
> > > > > > However, it could arguably be unified under one interface where
> we
> > > > would
> > > > > > re-use or re-name `canFastDuplicate(Path, Path)` to
> > > > > > `canFastCopy(Path, Path)` with the following use cases:
> > > > > > - `canFastCopy(remoteA, remoteB)` returns true - current
> equivalent
> > > of
> > > > > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path
> > > > > > - `canFastCopy(local, remote)` returns true - FS can natively
> > upload
> > > > > local
> > > > > > file to a remote location
> > > > > > - `canFastCopy(remote, local)` returns true - FS can natively
> > > download
> > > > > > local file from a remote location
> > > > > >
> > > > > > Maybe indeed that's a better solution vs having two separate
> > > interfaces
> > > > > for
> > > > > > copying and duplicating?
> > > > > >
> > > > > > > 3. Will the interface extracting introduce a break change?
> > > > > >
> > > > > > No. The signature of the existing abstract `FileSystem` class
> would
> > > > > remain
> > > > > > the same. Only most/all of the abstract methods would be
> > > > > > pulled out to the interface and abstract `FileSystem` would
> > implement
> > > > > that
> > > > > > new interface.
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > > pon., 6 maj 2024 o 04:55 Zakelly Lan <zakelly....@gmail.com>
> > > > napisał(a):
> > > > > >
> > > > > > > Hi Piotr,
> > > > > > >
> > > > > > > Thanks for the proposal. It's meaningful to speed up the state
> > > > > download.
> > > > > > I
> > > > > > > get into some questions:
> > > > > > >
> > > > > > > 1. What is the semantic of `canCopyPath`? Should it be
> associated
> > > > with
> > > > > a
> > > > > > > specific destination path? e.g. It can be copied to local, but
> > not
> > > to
> > > > > the
> > > > > > > remote FS.
> > > > > > > 2. Is the existing interface `DuplicatingFileSystem`
> > > feasible/enough
> > > > > for
> > > > > > > this case?
> > > > > > > 3. Will the interface extracting introduce a break change?
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Zakelly
> > > > > > >
> > > > > > >
> > > > > > > On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko <
> > > z3d...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Piotr,
> > > > > > > >
> > > > > > > > Thanks for the proposal.
> > > > > > > > How adding a s5cmd will affect memory footprint? Since this
> is
> > a
> > > > > native
> > > > > > > > binary, memory consumption will not be controlled by JVM or
> > > Flink.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Aleksandr
> > > > > > > >
> > > > > > > > On Thu, 2 May 2024 at 11:12, Hong Liang <h...@apache.org>
> > wrote:
> > > > > > > >
> > > > > > > > > Hi Piotr,
> > > > > > > > >
> > > > > > > > > Thanks for the FLIP! Nice to see work to improve the
> > filesystem
> > > > > > > > > performance. +1 to future work to improve the upload speed
> as
> > > > well.
> > > > > > > This
> > > > > > > > > would be useful for jobs with large state and high Async
> > > > > > checkpointing
> > > > > > > > > times.
> > > > > > > > >
> > > > > > > > > Some thoughts on the configuration, it might be good for us
> > to
> > > > > > > introduce
> > > > > > > > 2x
> > > > > > > > > points of configurability for future proofing:
> > > > > > > > > 1/ Configure the implementation of PathsCopyingFileSystem
> > used,
> > > > > maybe
> > > > > > > by
> > > > > > > > > config, or by ServiceResources (this would allow us to use
> > this
> > > > for
> > > > > > > > > alternative clouds/Implement S3 SDKv2 support if we want
> this
> > > in
> > > > > the
> > > > > > > > > future). Also this could be used as a feature flag to
> > determine
> > > > if
> > > > > we
> > > > > > > > > should be using this new native file copy support.
> > > > > > > > > 2/ Configure the location of the s5cmd binary (version
> > control
> > > > > etc.),
> > > > > > > as
> > > > > > > > > you have mentioned in the FLIP.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Hong
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov
> > > > > > > > > <mor+fl...@morazow.com.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > Hey Piotr,
> > > > > > > > > >
> > > > > > > > > > Thanks for the proposal! It would be great improvement!
> > > > > > > > > >
> > > > > > > > > > Some questions from my side:
> > > > > > > > > >
> > > > > > > > > > > In order to configure s5cmd Flink’s user would need
> > > > > > > > > > > to specify path to the s5cmd binary.
> > > > > > > > > >
> > > > > > > > > > Could you please also add the configuration property
> > > > > > > > > > for this? An example showing how users would set this
> > > > > > > > > > parameter would be helpful.
> > > > > > > > > >
> > > > > > > > > > Would this affect any filesystem connectors that use
> > > > > > > > > > FileSystem[1][2] dependencies?
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Muhammet
> > > > > > > > > >
> > > > > > > > > > [1]:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > > > > > > > > [2]:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > > > > > > > > >
> > > > > > > > > > On 2024-04-30 13:15, Piotr Nowojski wrote:
> > > > > > > > > > > Hi all!
> > > > > > > > > > >
> > > > > > > > > > > I would like to put under discussion:
> > > > > > > > > > >
> > > > > > > > > > > FLIP-444: Native file copy support
> > > > > > > > > > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > > > > > > > > >
> > > > > > > > > > > This proposal aims to speed up Flink recovery times, by
> > > > > speeding
> > > > > > up
> > > > > > > > > > > state
> > > > > > > > > > > download times. However in the future, the same
> mechanism
> > > > could
> > > > > > be
> > > > > > > > also
> > > > > > > > > > > used to speed up state uploading
> > > > (checkpointing/savepointing).
> > > > > > > > > > >
> > > > > > > > > > > I'm curious to hear your thoughts.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Piotrek
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Hangxiang.
> >
>


-- 
Best,
Hangxiang.

Reply via email to