Hi Piotr,

Thank you for the proposal. Looks great.

Along similar line of Aleks' question on memory usage.

The proposal mentions that s5cmd utilises 100% of CPU similar to Flink
1.18. However, this will be a native process outside of the JVM. Are there
risk of large/long state download starving the TM of CPU cycle causing
issues such as heartbeat or ask timeout?

Do you know if there is a way to limit the CPU utilisation of s5cmd? I see
worker and concurrency configuration but these do not map directly to cap
in CPU usage. The experience for feature user in this case will be one of
trial and error.

Thanks
Keith

On Wed, May 8, 2024 at 12:47 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Piotr
> +1 for the proposal, it seems to have a lot of gains.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Mon, 6 May 2024 at 12:06, Zakelly Lan <zakelly....@gmail.com> wrote:
>
> > Hi Piotrek,
> >
> > Thanks for your answers!
> >
> > Good question. The intention and use case behind `DuplicatingFileSystem`
> is
> > > different. It marks if `FileSystem` can quickly copy/duplicate files
> > > in the remote `FileSystem`. For example an equivalent of a hard link or
> > > bumping a reference count in the remote system. That's a bit different
> > > to copy paths between remote and local file systems.
> > >
> > > However, it could arguably be unified under one interface where we
> would
> > > re-use or re-name `canFastDuplicate(Path, Path)` to
> > > `canFastCopy(Path, Path)` with the following use cases:
> > > - `canFastCopy(remoteA, remoteB)` returns true - current equivalent of
> > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path
> > > - `canFastCopy(local, remote)` returns true - FS can natively upload
> > local
> > > file to a remote location
> > > - `canFastCopy(remote, local)` returns true - FS can natively download
> > > local file from a remote location
> > >
> > > Maybe indeed that's a better solution vs having two separate interfaces
> > for
> > > copying and duplicating?
> > >
> >
> > I'd prefer a unified one interface, `canFastCopy(Path, Path)` looks good
> to
> > me. This also resolves my question 1 about the destination.
> >
> >
> > Best,
> > Zakelly
> >
> > On Mon, May 6, 2024 at 6:36 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi All!
> > >
> > > Thanks for your comments.
> > >
> > > Muhammet and Hong, about the config options.
> > >
> > > > Could you please also add the configuration property for this? An
> > example
> > > showing how users would set this parameter would be helpful.
> > >
> > > > 1/ Configure the implementation of PathsCopyingFileSystem used
> > > > 2/ Configure the location of the s5cmd binary (version control etc.)
> > >
> > > Ops, sorry I added the config options that I had in mind to the FLIP. I
> > > don't know why I have omitted this. Basically I suggest that in order
> to
> > > use native file copying:
> > > 1. `FileSystem` must support it via implementing
> `PathsCopyingFileSystem`
> > > interface
> > > 2. That `FileSystem` would have to be configured to actually use it.
> For
> > > example S3 file system would return `true` that it can copy paths
> > >     only if `s3.s5cmd.path` has been specified.
> > >
> > > > Would this affect any filesystem connectors that use FileSystem[1][2]
> > > dependencies?
> > >
> > > Definitely not out of the box. Any place in Flink that is currently
> > > uploading/downloading files from a FileSystem could use this feature,
> but
> > > it
> > > would have to be implemented. The same way this FLIP will implement
> > native
> > > files copying when downloading state during recovery,
> > > but the old code path will be still used for uploading state files
> > during a
> > > checkpoint.
> > >
> > > > How adding a s5cmd will affect memory footprint? Since this is a
> native
> > > binary, memory consumption will not be controlled by JVM or Flink.
> > >
> > > As you mentioned the memory usage of `s5cmd` will not be controlled, so
> > the
> > > memory footprint will grow. S5cmd integration with Flink
> > > has been tested quite extensively on our production environment
> already,
> > > and we haven't observed any issues so far despite the fact we
> > > are using quite small pods. But of course if your setup is working on
> the
> > > edge of OOM, this could tip you over that edge.
> > >
> > > Zakelly:
> > >
> > > > 1. What is the semantic of `canCopyPath`? Should it be associated
> with
> > a
> > > > specific destination path? e.g. It can be copied to local, but not to
> > the
> > > > remote FS.
> > >
> > > For the S3 (both for SDKv2 and s5cmd implementations), the copying
> > > direction (upload/download) doesn't matter. I don't know about other
> > > file systems, I haven't investigated anything besides S3. Nevertheless
> I
> > > wouldn't worry too much about it, since we can start with the simple
> > > `canCopyPath` that handles both directions. If this will become
> important
> > > in the future, adding directional `canDownloadPath` or `canUploadPath`
> > > would be a backward compatible change, so we can safely extend it in
> the
> > > future if needed.
> > >
> > > > 2. Is the existing interface `DuplicatingFileSystem` feasible/enough
> > for
> > > this case?
> > >
> > > Good question. The intention and use case behind
> `DuplicatingFileSystem`
> > is
> > > different. It marks if `FileSystem` can quickly copy/duplicate files
> > > in the remote `FileSystem`. For example an equivalent of a hard link or
> > > bumping a reference count in the remote system. That's a bit different
> > > to copy paths between remote and local file systems.
> > >
> > > However, it could arguably be unified under one interface where we
> would
> > > re-use or re-name `canFastDuplicate(Path, Path)` to
> > > `canFastCopy(Path, Path)` with the following use cases:
> > > - `canFastCopy(remoteA, remoteB)` returns true - current equivalent of
> > > `DuplicatingFileSystem` - quickly duplicate/hard link remote path
> > > - `canFastCopy(local, remote)` returns true - FS can natively upload
> > local
> > > file to a remote location
> > > - `canFastCopy(remote, local)` returns true - FS can natively download
> > > local file from a remote location
> > >
> > > Maybe indeed that's a better solution vs having two separate interfaces
> > for
> > > copying and duplicating?
> > >
> > > > 3. Will the interface extracting introduce a break change?
> > >
> > > No. The signature of the existing abstract `FileSystem` class would
> > remain
> > > the same. Only most/all of the abstract methods would be
> > > pulled out to the interface and abstract `FileSystem` would implement
> > that
> > > new interface.
> > >
> > > Best,
> > > Piotrek
> > >
> > > pon., 6 maj 2024 o 04:55 Zakelly Lan <zakelly....@gmail.com>
> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for the proposal. It's meaningful to speed up the state
> > download.
> > > I
> > > > get into some questions:
> > > >
> > > > 1. What is the semantic of `canCopyPath`? Should it be associated
> with
> > a
> > > > specific destination path? e.g. It can be copied to local, but not to
> > the
> > > > remote FS.
> > > > 2. Is the existing interface `DuplicatingFileSystem` feasible/enough
> > for
> > > > this case?
> > > > 3. Will the interface extracting introduce a break change?
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > >
> > > > On Thu, May 2, 2024 at 6:50 PM Aleksandr Pilipenko <z3d...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thanks for the proposal.
> > > > > How adding a s5cmd will affect memory footprint? Since this is a
> > native
> > > > > binary, memory consumption will not be controlled by JVM or Flink.
> > > > >
> > > > > Thanks,
> > > > > Aleksandr
> > > > >
> > > > > On Thu, 2 May 2024 at 11:12, Hong Liang <h...@apache.org> wrote:
> > > > >
> > > > > > Hi Piotr,
> > > > > >
> > > > > > Thanks for the FLIP! Nice to see work to improve the filesystem
> > > > > > performance. +1 to future work to improve the upload speed as
> well.
> > > > This
> > > > > > would be useful for jobs with large state and high Async
> > > checkpointing
> > > > > > times.
> > > > > >
> > > > > > Some thoughts on the configuration, it might be good for us to
> > > > introduce
> > > > > 2x
> > > > > > points of configurability for future proofing:
> > > > > > 1/ Configure the implementation of PathsCopyingFileSystem used,
> > maybe
> > > > by
> > > > > > config, or by ServiceResources (this would allow us to use this
> for
> > > > > > alternative clouds/Implement S3 SDKv2 support if we want this in
> > the
> > > > > > future). Also this could be used as a feature flag to determine
> if
> > we
> > > > > > should be using this new native file copy support.
> > > > > > 2/ Configure the location of the s5cmd binary (version control
> > etc.),
> > > > as
> > > > > > you have mentioned in the FLIP.
> > > > > >
> > > > > > Regards,
> > > > > > Hong
> > > > > >
> > > > > >
> > > > > > On Thu, May 2, 2024 at 9:40 AM Muhammet Orazov
> > > > > > <mor+fl...@morazow.com.invalid> wrote:
> > > > > >
> > > > > > > Hey Piotr,
> > > > > > >
> > > > > > > Thanks for the proposal! It would be great improvement!
> > > > > > >
> > > > > > > Some questions from my side:
> > > > > > >
> > > > > > > > In order to configure s5cmd Flink’s user would need
> > > > > > > > to specify path to the s5cmd binary.
> > > > > > >
> > > > > > > Could you please also add the configuration property
> > > > > > > for this? An example showing how users would set this
> > > > > > > parameter would be helpful.
> > > > > > >
> > > > > > > Would this affect any filesystem connectors that use
> > > > > > > FileSystem[1][2] dependencies?
> > > > > > >
> > > > > > > Best,
> > > > > > > Muhammet
> > > > > > >
> > > > > > > [1]:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
> > > > > > > [2]:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
> > > > > > >
> > > > > > > On 2024-04-30 13:15, Piotr Nowojski wrote:
> > > > > > > > Hi all!
> > > > > > > >
> > > > > > > > I would like to put under discussion:
> > > > > > > >
> > > > > > > > FLIP-444: Native file copy support
> > > > > > > > https://cwiki.apache.org/confluence/x/rAn9EQ
> > > > > > > >
> > > > > > > > This proposal aims to speed up Flink recovery times, by
> > speeding
> > > up
> > > > > > > > state
> > > > > > > > download times. However in the future, the same mechanism
> could
> > > be
> > > > > also
> > > > > > > > used to speed up state uploading
> (checkpointing/savepointing).
> > > > > > > >
> > > > > > > > I'm curious to hear your thoughts.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Piotrek
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to