Hi all!

>> Do you know if there is a way to limit the CPU utilisation of s5cmd? I
see
>> worker and concurrency configuration but these do not map directly to cap
>> in CPU usage. The experience for feature user in this case will be one of
>> trial and error.
>
> Those are good points. As a matter of fact, shortly after publishing this
FLIP, we started experimenting with using `cpulimit` to achieve just that.
If everything will work out fine, we are planning to expose this as a
configuration option for the S3 file system. I've added that to the FLIP.

Before finally putting this FLIP under vote, I would like to revisit/share
with you our production experience when it comes to preventing s5cmd from
overloading the machines.

When trying out `cpulimit`, we had some problems on our particular setup to
make `cpulimit` behave the way we want. I think in general that's still a
viable option, but we have internally settled upon just limiting the number
of workers via:

    "s3.s5cmd.args": "-r 0 --numworkers 5"

That seems to work reliably with a combination of FLINK-35501 [1], without
affecting the actual performance. But I will be curious to learn what will
eventually work best for others.

Hence, I would propose also to modify the FLIP slightly, and remove
"native" support for cpulimit from this FLIP. In other words, I would
propose to drop the config option:

    public static final ConfigOption<String> S5CMD_CPULIMIT =
            ConfigOptions.key("s3.s5cmd.cpulimit")
                    .doubleType()
                    .withDescription(
                            "Optional cpulimit value to set for the s5cmd
to prevent TaskManager from overloading");

The thing is, that it seems a bit platform specific, and there is actually
no need for this dedicated config option. Instead I would propose to
document in some best practices section two solutions to limit the resource
usage of the s5cmd.

1.  Adding ` --numworkers X` to the `s3.s5cmd.args` ConfigOption and/or
adjusting IO Thread Pool size (thanks to [1])
2. To use `s5cmd` with `cpulimit` via some bash script wrapping the
`s5cmd`. Users instead of passing a path to the `s5cmd` via the
`s3.s5cmd.path` config option, can pass a path to that wrapper bash script.
That wrapper could then apply `cpulimit` to the `s5cmd` call (or use a
completely different mechanism, like CGroups if desired).

As I mentioned above, in the end I think hardcoding in Flink support for
the `cpulimit` doesn't seem appropriate and at the same time it is not
actually necessary.

WDYT?

Best,
Piotrek


[1] https://issues.apache.org/jira/browse/FLINK-35501

pt., 17 maj 2024 o 15:27 <lorenzo.affe...@ververica.com.invalid> napisał(a):

> Perfectly agree with all your considerations.
> Wee said.
>
> Thank you!
> On May 16, 2024 at 10:53 +0200, Piotr Nowojski <pnowoj...@apache.org>,
> wrote:
> > Hi Lorenzo,
> >
> > > • concerns about memory and CPU used out of Flink's control
> >
> > Please note that using AWS SDKv2 would have the same concerns. In both
> > cases Flink can control only to a certain extent what either the SDKv2
> > library does under the hood or the s5cmd process, via configuration
> > parameters. SDKv2, if configured improperly, could also overload TMs CPU,
> > and it would also use extra memory. To an extent that also applies to the
> > current way we are downloading/uploading files from the S3.
> >
> > > • deployment concerns from the usability perspective (would need to
> > install s5cmd on all TMs prior to the job deploy)
> >
> > Yes, that's the downside.
> >
> > > Also, invoking an external binary would incur in some performance
> > degradation possibly. Might it be that using AWS SDK would not, given its
> > Java implementation, and performance could be similar?
> >
> > We haven't run the benchmarks for very fast checkpointing and we haven't
> > measured latency impact, but please note `s5cmd` is very fast to startup.
> > It's not Java after all ;) It's at least an order of magnitude below 1s,
> so
> > the impact on Flink should be negligible, as Flink doesn't support
> > checkpointing under < 1s. Especially not with uploading files to S3. I
> > wouldn't be actually surprised that `s5cmd` (or AWS
> > SDKv2's `TransferManager`) would actually both improve minimal e2e
> > checkpointing times.
> >
> > > Wrapping up, I see using AWS SDK having PROs that could be traded with
> > the CON of slightly worse perf than s5cmd:
> > >
> > > • no hurdles for the user, as the SDK would be a Flink dependency
> > > • less config on the Flink side
> > >
> > > Do you agree?
> >
> > To an extent, yes. AWS SDKv2's TransferManager would also have to be
> > configured properly. But I agree that the largest hurdle with `s5cmd` is
> > the added operational complexity of having to supply 3rd party binary.
> >
> > Please also keep in mind that a lot of work for that FLIP will be in
> > defining, creating and using the interfaces for batch files copy. The
> > actual implementation of the fast copying file system interface and using
> > `s5cmd` is not the dominant factor. So all in all, I wouldn't object if
> > someone would like to take over my AWS SDKv2 PoC, and finish it off in
> the
> > future, as an alternative for the `s5cmd`. Indeed AWS SDKv2 could at some
> > point become the default setting, while `s5cmd` could remain as a faster
> > alternative.
> >
> > Best,
> > Piotrek
> >
> > czw., 16 maj 2024 o 09:03 <lorenzo.affe...@ververica.com.invalid>
> > napisał(a):
> >
> > > Hello Piotr and thanks for this proposal!
> > > The idea sounds smart and very well grounded thank you!
> > >
> > > Also here, as others, I have some doubts about invoking an external
> binary
> > > (namely s5cmd):
> > >
> > > • concerns about memory and CPU used out of Flink's control
> > > • deployment concerns from the usability perspective (would need to
> > > install s5cmd on all TMs prior to the job deploy)
> > >
> > >
> > > Also, invoking an external binary would incur in some performance
> > > degradation possibly. Might it be that using AWS SDK would not, given
> its
> > > Java implementation, and performance could be similar?
> > >
> > > Wrapping up, I see using AWS SDK having PROs that could be traded with
> the
> > > CON of slightly worse perf than s5cmd:
> > >
> > > • no hurdles for the user, as the SDK would be a Flink dependency
> > > • less config on the Flink side
> > >
> > >
> > > Do you agree?
> > > On May 13, 2024 at 07:42 +0200, Hangxiang Yu <master...@gmail.com>,
> wrote:
> > > > > > >
> > > > > > > Note that for both recovery and checkpoints, there are no
> retring
> > > > > > > mechanisms. If any part of downloading or
> > > > > > > uploading fails, the job fails over, so actually using such
> interface
> > > > > > > extension would be out of scope of this FLIP. In
> > > > > > > that case, maybe if this could be extended in the future
> without
> > > breaking
> > > > > > > compatibility we could leave it as a
> > > > > > > future improvement?
> > > > > > >
> > > > >
> > > > > Thanks for the reply.
> > > > > It makes sense to consider as a future optimization.
> > > > >
> > > > > On Fri, May 10, 2024 at 4:31 PM Piotr Nowojski <
> piotr.nowoj...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > > Hi!
> > > > > > >
> > > > > > > Thanks for your suggestions!
> > > > > > >
> > > > > > > > > > > I'd prefer a unified one interface
> > > > > > >
> > > > > > > I have updated the FLIP to take that into account. In this
> case, I
> > > would
> > > > > > > also propose to completely drop `DuplicatingFileSystem` in
> favour of a
> > > > > > > basically renamed version of it `PathsCopyingFileSystem`.
> > > > > > > `DuplicatingFileSystem` was not marked as
> PublicEvolving/Experimental
> > > > > > > (probably by mistake), so technically we can do it. Even if
> not for
> > > that
> > > > > > > mistake, I would still vote to replace it to simplify the
> code, as any
> > > > > > > migration would be very easy. At the same time to the best of
> my
> > > knowledge,
> > > > > > > no one has ever implemented it.
> > > > > > >
> > > > > > > > > > > The proposal mentions that s5cmd utilises 100% of CPU
> similar to
> > > Flink
> > > > > > > > > > > 1.18. However, this will be a native process outside
> of the JVM.
> > > Are
> > > > > > > there
> > > > > > > > > > > risk of large/long state download starving the TM of
> CPU cycle
> > > causing
> > > > > > > > > > > issues such as heartbeat or ask timeout?
> > > > > > > > > > >
> > > > > > > > > > > Do you know if there is a way to limit the CPU
> utilisation of
> > > s5cmd? I
> > > > > > > see
> > > > > > > > > > > worker and concurrency configuration but these do not
> map directly
> > > to cap
> > > > > > > > > > > in CPU usage. The experience for feature user in this
> case will be
> > > one of
> > > > > > > > > > > trial and error.
> > > > > > >
> > > > > > > Those are good points. As a matter of fact, shortly after
> publishing
> > > this
> > > > > > > FLIP, we started experimenting with using `cpulimit` to
> achieve just
> > > that.
> > > > > > > If everything will work out fine, we are planning to expose
> this as a
> > > > > > > configuration option for the S3 file system. I've added that
> to the
> > > FLIP.
> > > > > > >
> > > > > > > > > > > 2. copyFiles is not an atomic operation, How could we
> handle the
> > > > > > > situation
> > > > > > > > > > > when some partial files fail ?
> > > > > > > > > > > Could we return the list of successful files then the
> caller could
> > > decide
> > > > > > > > > > > to retry or just know them ?
> > > > > > >
> > > > > > > Do you have some suggestions on how that should be implemented
> in the
> > > > > > > interface and how should it be used?
> > > > > > >
> > > > > > > Note that for both recovery and checkpoints, there are no
> retring
> > > > > > > mechanisms. If any part of downloading or
> > > > > > > uploading fails, the job fails over, so actually using such
> interface
> > > > > > > extension would be out of scope of this FLIP. In
> > > > > > > that case, maybe if this could be extended in the future
> without
> > > breaking
> > > > > > > compatibility we could leave it as a
> > > > > > > future improvement?
> > > > > > >
> > > > > > > Best,
> > > > > > > Piotrek
> > > > > > >
> > > > > > >
> > > > > > > pt., 10 maj 2024 o 07:40 Hangxiang Yu <master...@gmail.com>
> > > napisał(a):
> > > > > > >
> > > > > > > > > > > Hi Piotr.
> > > > > > > > > > > Thanks for your proposal.
> > > > > > > > > > >
> > > > > > > > > > > I have some comments, PTAL:
> > > > > > > > > > > 1. +1 about unifying the interface with
> DuplicatingFileSystem.
> > > > > > > > > > > IIUC, DuplicatingFileSystem also covers the logic
> from/to both
> > > local and
> > > > > > > > > > > remote paths.
> > > > > > > > > > > The implementations could define their own logic about
> how to fast
> > > > > > > > > > > copy/duplicate files, e.g. hard link or transfer
> manager.
> > > > > > > > > > >
> > > > > > > > > > > 2. copyFiles is not an atomic operation, How could we
> handle the
> > > > > > > situation
> > > > > > > > > > > when some partial files fail ?
> > > > > > > > > > > Could we return the list of successful files then the
> caller could
> > > decide
> > > > > > > > > > > to retry or just know them ?
> > > > > > > > > > >
> > > > > > > > > > > On Thu, May 9, 2024 at 3:46 PM Keith Lee <
> > > leekeiabstract...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Piotr,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thank you for the proposal. Looks great.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Along similar line of Aleks' question on
> memory usage.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The proposal mentions that s5cmd utilises 100%
> of CPU similar
> > > to Flink
> > > > > > > > > > > > > > > 1.18. However, this will be a native process
> outside of the
> > > JVM. Are
> > > > > > > > > > > there
> > > > > > > > > > > > > > > risk of large/long state download starving the
> TM of CPU cycle
> > > causing
> > > > > > > > > > > > > > > issues such as heartbeat or ask timeout?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Do you know if there is a way to limit the CPU
> utilisation of
> > > s5cmd? I
> > > > > > > > > > > see
> > > > > > > > > > > > > > > worker and concurrency configuration but these
> do not map
> > > directly to
> > > > > > > cap
> > > > > > > > > > > > > > > in CPU usage. The experience for feature user
> in this case
> > > will be one
> > > > > > > of
> > > > > > > > > > > > > > > trial and error.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > > Keith
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, May 8, 2024 at 12:47 PM Ahmed Hamdy <
> > > hamdy10...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi Piotr
> > > > > > > > > > > > > > > > > > > +1 for the proposal, it seems to have
> a lot of gains.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Best Regards
> > > > > > > > > > > > > > > > > > > Ahmed Hamdy
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Mon, 6 May 2024 at 12:06, Zakelly
> Lan <
> > > zakelly....@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Hi Piotrek,
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Thanks for your answers!
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Good question. The intention
> and use case behind
> > > > > > > > > > > > > > > `DuplicatingFileSystem`
> > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > > > > > > different. It marks if
> `FileSystem` can quickly
> > > copy/duplicate
> > > > > > > > > > > files
> > > > > > > > > > > > > > > > > > > > > > > > > > > in the remote
> `FileSystem`. For example an
> > > equivalent of a hard
> > > > > > > > > > > link
> > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > > > > > > > > bumping a reference
> count in the remote system.
> > > That's a bit
> > > > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > > > > > > > > > > to copy paths between
> remote and local file
> > > systems.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > However, it could
> arguably be unified under one
> > > interface where
> > > > > > > we
> > > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > > > > > > > > re-use or re-name
> `canFastDuplicate(Path, Path)` to
> > > > > > > > > > > > > > > > > > > > > > > > > > > `canFastCopy(Path,
> Path)` with the following use
> > > cases:
> > > > > > > > > > > > > > > > > > > > > > > > > > > -
> `canFastCopy(remoteA, remoteB)` returns true -
> > > current
> > > > > > > equivalent
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> `DuplicatingFileSystem` - quickly duplicate/hard
> > > link remote path
> > > > > > > > > > > > > > > > > > > > > > > > > > > - `canFastCopy(local,
> remote)` returns true - FS
> > > can natively
> > > > > > > > > > > upload
> > > > > > > > > > > > > > > > > > > > > > > local
> > > > > > > > > > > > > > > > > > > > > > > > > > > file to a remote
> location
> > > > > > > > > > > > > > > > > > > > > > > > > > > - `canFastCopy(remote,
> local)` returns true - FS
> > > can natively
> > > > > > > > > > > > > > > download
> > > > > > > > > > > > > > > > > > > > > > > > > > > local file from a
> remote location
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Maybe indeed that's a
> better solution vs having
> > > two separate
> > > > > > > > > > > > > > > interfaces
> > > > > > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > > > copying and
> duplicating?
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > I'd prefer a unified one
> interface, `canFastCopy(Path,
> > > Path)` looks
> > > > > > > > > > > > > > > good
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > me. This also resolves my
> question 1 about the
> > > destination.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > On Mon, May 6, 2024 at 6:36 PM
> Piotr Nowojski <
> > > > > > > pnowoj...@apache.org>
> > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Hi All!
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your
> comments.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Muhammet and Hong,
> about the config options.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Could you
> please also add the configuration
> > > property for this?
> > > > > > > An
> > > > > > > > > > > > > > > > > > > > > > > example
> > > > > > > > > > > > > > > > > > > > > > > > > > > showing how users
> would set this parameter would
> > > be helpful.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1/ Configure
> the implementation of
> > > PathsCopyingFileSystem used
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2/ Configure
> the location of the s5cmd binary
> > > (version control
> > > > > > > > > > > > > > > etc.)
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Ops, sorry I added the
> config options that I had
> > > in mind to the
> > > > > > > > > > > > > > > FLIP. I
> > > > > > > > > > > > > > > > > > > > > > > > > > > don't know why I have
> omitted this. Basically I
> > > suggest that in
> > > > > > > > > > > order
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > > > > use native file
> copying:
> > > > > > > > > > > > > > > > > > > > > > > > > > > 1. `FileSystem` must
> support it via implementing
> > > > > > > > > > > > > > > > > > > `PathsCopyingFileSystem`
> > > > > > > > > > > > > > > > > > > > > > > > > > > interface
> > > > > > > > > > > > > > > > > > > > > > > > > > > 2. That `FileSystem`
> would have to be configured
> > > to actually use
> > > > > > > > > > > it.
> > > > > > > > > > > > > > > > > > > For
> > > > > > > > > > > > > > > > > > > > > > > > > > > example S3 file system
> would return `true` that it
> > > can copy paths
> > > > > > > > > > > > > > > > > > > > > > > > > > > only if
> `s3.s5cmd.path` has been specified.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Would this
> affect any filesystem connectors
> > > that use
> > > > > > > > > > > > > > > FileSystem[1][2]
> > > > > > > > > > > > > > > > > > > > > > > > > > > dependencies?
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Definitely not out of
> the box. Any place in Flink
> > > that is
> > > > > > > currently
> > > > > > > > > > > > > > > > > > > > > > > > > > > uploading/downloading
> files from a FileSystem
> > > could use this
> > > > > > > > > > > feature,
> > > > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > > > > > > > would have to be
> implemented. The same way this
> > > FLIP will
> > > > > > > implement
> > > > > > > > > > > > > > > > > > > > > > > native
> > > > > > > > > > > > > > > > > > > > > > > > > > > files copying when
> downloading state during
> > > recovery,
> > > > > > > > > > > > > > > > > > > > > > > > > > > but the old code path
> will be still used for
> > > uploading state
> > > > > > > files
> > > > > > > > > > > > > > > > > > > > > > > during a
> > > > > > > > > > > > > > > > > > > > > > > > > > > checkpoint.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > How adding a
> s5cmd will affect memory
> > > footprint? Since this is
> > > > > > > a
> > > > > > > > > > > > > > > > > > > native
> > > > > > > > > > > > > > > > > > > > > > > > > > > binary, memory
> consumption will not be controlled
> > > by JVM or
> > > > > > > Flink.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > As you mentioned the
> memory usage of `s5cmd` will
> > > not be
> > > > > > > > > > > controlled,
> > > > > > > > > > > > > > > so
> > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > memory footprint will
> grow. S5cmd integration with
> > > Flink
> > > > > > > > > > > > > > > > > > > > > > > > > > > has been tested quite
> extensively on our
> > > production environment
> > > > > > > > > > > > > > > > > > > already,
> > > > > > > > > > > > > > > > > > > > > > > > > > > and we haven't
> observed any issues so far despite
> > > the fact we
> > > > > > > > > > > > > > > > > > > > > > > > > > > are using quite small
> pods. But of course if your
> > > setup is
> > > > > > > working
> > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > edge of OOM, this
> could tip you over that edge.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Zakelly:
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. What is the
> semantic of `canCopyPath`?
> > > Should it be
> > > > > > > associated
> > > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > specific
> destination path? e.g. It can be
> > > copied to local, but
> > > > > > > > > > > not
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > remote FS.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > For the S3 (both for
> SDKv2 and s5cmd
> > > implementations), the
> > > > > > > copying
> > > > > > > > > > > > > > > > > > > > > > > > > > > direction
> (upload/download) doesn't matter. I
> > > don't know about
> > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > > > > > > > > > > file systems, I
> haven't investigated anything
> > > besides S3.
> > > > > > > > > > > > > > > Nevertheless
> > > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > > > > > > > > wouldn't worry too
> much about it, since we can
> > > start with the
> > > > > > > > > > > simple
> > > > > > > > > > > > > > > > > > > > > > > > > > > `canCopyPath` that
> handles both directions. If
> > > this will become
> > > > > > > > > > > > > > > > > > > important
> > > > > > > > > > > > > > > > > > > > > > > > > > > in the future, adding
> directional
> > > `canDownloadPath` or
> > > > > > > > > > > > > > > `canUploadPath`
> > > > > > > > > > > > > > > > > > > > > > > > > > > would be a backward
> compatible change, so we can
> > > safely extend it
> > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > future if needed.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Is the
> existing interface
> > > `DuplicatingFileSystem`
> > > > > > > > > > > > > > > feasible/enough
> > > > > > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > > > this case?
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Good question. The
> intention and use case behind
> > > > > > > > > > > > > > > > > > > `DuplicatingFileSystem`
> > > > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > > > > > > different. It marks if
> `FileSystem` can quickly
> > > copy/duplicate
> > > > > > > > > > > files
> > > > > > > > > > > > > > > > > > > > > > > > > > > in the remote
> `FileSystem`. For example an
> > > equivalent of a hard
> > > > > > > > > > > link
> > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > > > > > > > > bumping a reference
> count in the remote system.
> > > That's a bit
> > > > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > > > > > > > > > > to copy paths between
> remote and local file
> > > systems.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > However, it could
> arguably be unified under one
> > > interface where
> > > > > > > we
> > > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > > > > > > > > re-use or re-name
> `canFastDuplicate(Path, Path)` to
> > > > > > > > > > > > > > > > > > > > > > > > > > > `canFastCopy(Path,
> Path)` with the following use
> > > cases:
> > > > > > > > > > > > > > > > > > > > > > > > > > > -
> `canFastCopy(remoteA, remoteB)` returns true -
> > > current
> > > > > > > equivalent
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> `DuplicatingFileSystem` - quickly duplicate/hard
> > > link remote path
> > > > > > > > > > > > > > > > > > > > > > > > > > > - `canFastCopy(local,
> remote)` returns true - FS
> > > can natively
> > > > > > > > > > > upload
> > > > > > > > > > > > > > > > > > > > > > > local
> > > > > > > > > > > > > > > > > > > > > > > > > > > file to a remote
> location
> > > > > > > > > > > > > > > > > > > > > > > > > > > - `canFastCopy(remote,
> local)` returns true - FS
> > > can natively
> > > > > > > > > > > > > > > download
> > > > > > > > > > > > > > > > > > > > > > > > > > > local file from a
> remote location
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Maybe indeed that's a
> better solution vs having
> > > two separate
> > > > > > > > > > > > > > > interfaces
> > > > > > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > > > copying and
> duplicating?
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Will the
> interface extracting introduce a
> > > break change?
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > No. The signature of
> the existing abstract
> > > `FileSystem` class
> > > > > > > would
> > > > > > > > > > > > > > > > > > > > > > > remain
> > > > > > > > > > > > > > > > > > > > > > > > > > > the same. Only
> most/all of the abstract methods
> > > would be
> > > > > > > > > > > > > > > > > > > > > > > > > > > pulled out to the
> interface and abstract
> > > `FileSystem` would
> > > > > > > > > > > implement
> > > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > > > new interface.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > > Piotrek
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > pon., 6 maj 2024 o
> 04:55 Zakelly Lan <
> > > zakelly....@gmail.com>
> > > > > > > > > > > > > > > > > > > napisał(a):
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Piotr,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the
> proposal. It's meaningful to
> > > speed up the state
> > > > > > > > > > > > > > > > > > > > > > > download.
> > > > > > > > > > > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > get into some
> questions:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. What is the
> semantic of `canCopyPath`?
> > > Should it be
> > > > > > > associated
> > > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > specific
> destination path? e.g. It can be
> > > copied to local, but
> > > > > > > > > > > not
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > remote FS.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Is the
> existing interface
> > > `DuplicatingFileSystem`
> > > > > > > > > > > > > > > feasible/enough
> > > > > > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > this case?
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Will the
> interface extracting introduce a
> > > break change?
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 2,
> 2024 at 6:50 PM Aleksandr
> > > Pilipenko <
> > > > > > > > > > > > > > > z3d...@gmail.com
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi
> Piotr,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks
> for the proposal.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > How
> adding a s5cmd will affect memory
> > > footprint? Since this
> > > > > > > is
> > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > > > > native
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> binary, memory consumption will not be
> > > controlled by JVM or
> > > > > > > > > > > > > > > Flink.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> Aleksandr
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On
> Thu, 2 May 2024 at 11:12, Hong Liang <
> > > h...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > Hi Piotr,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > Thanks for the FLIP! Nice to see work
> > > to improve the
> > > > > > > > > > > filesystem
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > performance. +1 to future work to
> > > improve the upload speed
> > > > > > > as
> > > > > > > > > > > > > > > > > > > well.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > would be useful for jobs with large
> > > state and high Async
> > > > > > > > > > > > > > > > > > > > > > > > > > > checkpointing
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > times.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > Some thoughts on the configuration, it
> > > might be good for us
> > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > introduce
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2x
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > points of configurability for future
> > > proofing:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > 1/ Configure the implementation of
> > > PathsCopyingFileSystem
> > > > > > > > > > > used,
> > > > > > > > > > > > > > > > > > > > > > > maybe
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > config, or by ServiceResources (this
> > > would allow us to use
> > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > alternative clouds/Implement S3 SDKv2
> > > support if we want
> > > > > > > this
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > future). Also this could be used as a
> > > feature flag to
> > > > > > > > > > > determine
> > > > > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > should be using this new native file
> > > copy support.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > 2/ Configure the location of the s5cmd
> > > binary (version
> > > > > > > > > > > control
> > > > > > > > > > > > > > > > > > > > > > > etc.),
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > you have mentioned in the FLIP.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > Regards,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > Hong
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > On Thu, May 2, 2024 at 9:40 AM
> > > Muhammet Orazov
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > <mor+fl...@morazow.com.invalid> wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > Hey Piotr,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > Thanks for the proposal! It would
> > > be great improvement!
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > Some questions from my side:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > In order to configure s5cmd
> > > Flink’s user would need
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > to specify path to the s5cmd
> > > binary.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > Could you please also add the
> > > configuration property
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > for this? An example showing how
> > > users would set this
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > parameter would be helpful.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > Would this affect any filesystem
> > > connectors that use
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > FileSystem[1][2] dependencies?
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > Muhammet
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > [1]:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > [2]:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > >
> > >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > On 2024-04-30 13:15, Piotr
> > > Nowojski wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > Hi all!
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > I would like to put under
> > > discussion:
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > FLIP-444: Native file copy
> > > support
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > This proposal aims to speed up
> > > Flink recovery times, by
> > > > > > > > > > > > > > > > > > > > > > > speeding
> > > > > > > > > > > > > > > > > > > > > > > > > > > up
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > download times. However in the
> > > future, the same
> > > > > > > mechanism
> > > > > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > also
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > used to speed up state
> > > uploading
> > > > > > > > > > > > > > > > > > > (checkpointing/savepointing).
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > I'm curious to hear your
> > > thoughts.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > Piotrek
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Best,
> > > > > > > > > > > Hangxiang.
> > > > > > > > > > >
> > > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best,
> > > > > Hangxiang.
> > >
>

Reply via email to