Hi Yun,

Thanks for your suggestions!

I have read the FLINK-23342 and its design doc as you provided. First
of all the goal of this FLIP and the doc are similar, and the design
of this FLIP is pretty much like option 3. The main difference is that
we imply the concept of 'epoch' in the folder path for each
granularity. For shared state, the folder for each subtask is like
"${checkpointBaseDir}/shared/subtask-{index}-{parallelism}", so if the
${checkpointBaseDir} changes (when user restart a job manually) or the
${parallelism} changes (when rescaling), there will be a re-uploading,
and the JM takes care of the old artifacts. The folder path for
private state is in the form of
"${checkpointBaseDir}/tm-owned/${tmResourceId}" and the division of
responsibilities between JM and TM is similar. The design of this FLIP
inherits all the advantages of the design of option 3 in that doc, and
also avoids extra communication for epoch maintenance. As for the code
complexity, you may check the POC commit[1] and find that the
implementation is pretty clean and is a totally new code path making
nearly no influence on the old one. Comparing the number of lines of
code change with what's currently done for merging channel state[2]
(5200 vs. 2500 additions), I think it is acceptable considering we are
providing a unified file merging framework, which would save a lot of
effort in future. WDYT?

Thanks.

Best regards,
Zakelly

[1] POC of this FLIP:
https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
[2] Commit for FLINK-26803 (Merge the channel state files) :
https://github.com/apache/flink/commit/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e


On Fri, Apr 7, 2023 at 7:22 PM Yanfei Lei <fredia...@gmail.com> wrote:
>
> Thanks for your explanation Zakelly.
> (1) Keeping these merging granularities for different types of files
> as presets that are not configurable is a good idea to prevent
> performance degradation.
>
> (2)
> > For the third option, 64MB is an acceptable target size. The RocksDB state 
> > backend in Flink also chooses 64MB as the default target file size.
>
> Does this result in a larger space amplification? Maybe a more
> suitable value can be determined through some experimental statistics
> after we implement this feature.
>
> Best,
> Yanfei
>
> Jingsong Li <jingsongl...@gmail.com> 于2023年4月7日周五 17:09写道:
> >
> > Hi Yun,
> >
> > It looks like this doc needs permission to read? [1]
> >
> > [1] 
> > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> >
> > Best,
> > Jingsong
> >
> > On Fri, Apr 7, 2023 at 4:34 PM Piotr Nowojski <pnowoj...@apache.org> wrote:
> > >
> > > Hi,
> > >
> > > +1 To what Yun Tang wrote. We don't seem to have access to the design doc.
> > > Could you make it publicly visible or copy out its content to another
> > > document?
> > >
> > > Thanks for your answers Zakelly.
> > >
> > > (1)
> > > Yes, the current mechanism introduced in FLINK-24611 allows for checkpoint
> > > N, to only re-use shared state handles that have been already referenced 
> > > by
> > > checkpoint N-1. But why do we need to break this assumption? In your step,
> > > "d.", TM could adhere to that assumption, and instead of reusing File-2, 
> > > it
> > > could either re-use File-1, File-3 or create a new file.
> > >
> > > (2)
> > > Can you elaborate a bit more on this? As far as I recall, the purpose of
> > > the `RecoverableWriter` is to support exactly the things described in this
> > > FLIP, so what's the difference? If you are saying that for this FLIP you
> > > can implement something more efficiently for a given FileSystem, then why
> > > can it not be done the same way for the `RecoverableWriter`?
> > >
> > > Best,
> > > Piotrek
> > >
> > > czw., 6 kwi 2023 o 17:24 Yun Tang <myas...@live.com> napisał(a):
> > >
> > > > Hi Zakelly,
> > > >
> > > > Thanks for driving this work!
> > > >
> > > > I'm not sure did you ever read the discussion between Stephan, Roman,
> > > > Piotr, Yuan and I in the design doc [1] in nearly two years ago.
> > > >
> > > > From my understanding, your proposal is also a mixed state ownership: 
> > > > some
> > > > states are owned by the TM while some are owned by the JM. If my memory 
> > > > is
> > > > correct, we did not take the option-3 or option-5 in the design doc [1] 
> > > > for
> > > > the code complexity when implements the 1st version of changelog
> > > > state-backend.
> > > >
> > > > Could you also compare the current FLIP with the proposals in the design
> > > > doc[1]? From my understanding, we should at least consider to comapre 
> > > > with
> > > > option-3 and option-5 as they are all mixed solutions.
> > > >
> > > >
> > > > [1]
> > > > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > >
> > > > Best
> > > > Yun Tang
> > > >
> > > > ------------------------------
> > > > *From:* Zakelly Lan <zakelly....@gmail.com>
> > > > *Sent:* Thursday, April 6, 2023 16:38
> > > > *To:* dev@flink.apache.org <dev@flink.apache.org>
> > > > *Subject:* Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for
> > > > Checkpoints
> > > >
> > > > Hi Piotr,
> > > >
> > > > Thanks for all the feedback.
> > > >
> > > > (1) Thanks for the reminder. I have just seen the FLINK-24611, the 
> > > > delayed
> > > > deletion by JM resolves some sync problems between JM and TM, but I'm
> > > > afraid it is still not feasible for the file sharing in this FLIP.
> > > > Considering a concurrent checkpoint scenario as follows:
> > > >    a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are written in file 
> > > > 1,
> > > > and 4.sst is written in file 2.
> > > >    b. Checkpoint 2 starts based on checkpoint 1, including 1.sst, 2.sst
> > > > and 5.sst.
> > > >    c. Checkpoint 3 starts based on checkpoint 1, including 1.sst, 2.sst
> > > > and 5.sst as well.
> > > >    d. Checkpoint 3 reuses the file 2, TM writes 5.sst on it.
> > > >    e. Checkpoint 2 creates a new file 3, TM writes 5.sst on it.
> > > >    f. Checkpoint 2 finishes, checkpoint 1 is subsumed and the file 2 is
> > > > deleted, while checkpoint 3 still needs file 2.
> > > >
> > > > I attached a diagram to describe the scenario.
> > > > [image: concurrent cp.jpg]
> > > > The core issue is that this FLIP introduces a mechanism that allows
> > > > physical files to be potentially used by the next several checkpoints. 
> > > > JM
> > > > is uncertain whether there will be a TM continuing to write to a 
> > > > specific
> > > > file. So in this FLIP, TMs take the responsibility to delete the 
> > > > physical
> > > > files.
> > > >
> > > > (2) IIUC, the RecoverableWriter is introduced to persist data in the "in
> > > > progress" files after each checkpoint, and the implementation may be 
> > > > based
> > > > on the file sync in some file systems. However, since the sync is a 
> > > > heavy
> > > > operation for DFS, this FLIP wants to use flush instead of the sync with
> > > > the best effort. This only fits the case that the DFS is considered
> > > > reliable. The problems they want to solve are different.
> > > >
> > > > (3) Yes, if files are managed by JM via the shared registry, this 
> > > > problem
> > > > is solved. And as I mentioned in (1), there are some other corner cases
> > > > hard to resolve via the shared registry.
> > > >
> > > > The goal of this FLIP is to have a common way of merging files in all 
> > > > use
> > > > cases. For shared state it merges at subtask level, while for private 
> > > > state
> > > > (and changelog files, as I replied to Yanfei), files are merged at TM
> > > > level. So it is not contrary to the current plan for the unaligned
> > > > checkpoint state (FLINK-26803). You are right that the unaligned 
> > > > checkpoint
> > > > state would be merged with the operator's state file, so overall, it is
> > > > slightly better than what's currently done.
> > > >
> > > >
> > > > Thanks again for the valuable comments!
> > > >
> > > > Best regards,
> > > > Zakelly
> > > >
> > > >
> > > >
> > > > On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski <pnowoj...@apache.org>
> > > > wrote:
> > > >
> > > > Hi,
> > > >
> > > > Thanks for coming up with the proposal, it's definitely valuable. I'm 
> > > > still
> > > > reading and trying to understand the proposal, but a couple of comments
> > > > from my side.
> > > >
> > > > (1)
> > > > > Ownership of a single checkpoint file is transferred to TM, while JM
> > > > manages the parent directory of these files.
> > > >
> > > > Have you seen https://issues.apache.org/jira/browse/FLINK-24611 before? 
> > > > I
> > > > don't fully remember why, but we have rejected the idea of moving the 
> > > > file
> > > > ownership to TM and instead reworked the shared file registry in a way 
> > > > that
> > > > I think should be sufficient for file sharing. Could you elaborate why 
> > > > we
> > > > need to move the file ownership to TM, and why is the current mechanism 
> > > > not
> > > > sufficient?
> > > >
> > > > (2)
> > > > > File visibility is needed when a Flink job recovers after a 
> > > > > checkpoint is
> > > > materialized. In some DFS, such as most object storages, a file is only
> > > > visible after it is closed
> > > >
> > > > Is that really the case?
> > > > `org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to 
> > > > be
> > > > addressing exactly this issue, and the most frequently used FileSystem 
> > > > (S3)
> > > > AFAIK supports it with no problems?
> > > >
> > > > (3)
> > > > > 4.1.2 Merge files within a subtask or a TM
> > > > > Given that TMs are reassigned after restoration, it is difficult to
> > > > manage physical files that contain data from multiple subtasks scattered
> > > > across different TMs (as depicted in Fig.3). There is no synchronization
> > > > mechanism between TMs, making file management in this scenario 
> > > > challenging.
> > > >
> > > > I think this is solved in many places already via the shared state 
> > > > managed
> > > > by the JM, as I mentioned in (1).
> > > >
> > > >
> > > > If I understand it correctly you are proposing to have a common
> > > > interface/way of merging small files, in all use cases, that would work
> > > > only across a single subtask? That's contrary to what's currently done 
> > > > for
> > > > unaligned checkpoints, right? But if this generic mechanism was to be 
> > > > used
> > > > for unaligned checkpoints, unaligned checkpoint state would have been
> > > > merged with the operators state file, so all in all there would be no
> > > > regression visible to a user? The limit is that we always have at least 
> > > > a
> > > > single file per subtask, but in exchange we are getting a simpler 
> > > > threading
> > > > model?
> > > >
> > > > Bets,
> > > > Piotrek
> > > >
> > > > wt., 4 kwi 2023 o 08:51 Zakelly Lan <zakelly....@gmail.com> napisał(a):
> > > >
> > > > > Hi Yanfei,
> > > > >
> > > > > Thank you for your prompt response.
> > > > >
> > > > > I agree that managing (deleting) only some folders with JM can greatly
> > > > > relieve JM's burden. Thanks for pointing this out.
> > > > >
> > > > > In general, merging at the TM level is more effective since there are
> > > > > usually more files to merge. Therefore, I believe it is better to
> > > > > merge files per TM as much as possible.  However, for shared state,
> > > > > merging at the subtask level is the best choice to prevent significant
> > > > > data transfer over the network after restoring. I think it is better
> > > > > to keep these merging granularities for different types of files as
> > > > > presets that are not configurable. WDYT?
> > > > >
> > > > > As for the DSTL files, they are merged per TM and placed in the
> > > > > task-owned folder. These files can be classified as shared state since
> > > > > they are shared across checkpoints. However, the DSTL file is a
> > > > > special case that will be subsumed by the first checkpoint of the
> > > > > newly restored job. Therefore, there is no need for new TMs to keep
> > > > > these files after the old checkpoint is subsumed, just like the
> > > > > private state files. Thus, it is feasible to merge DSTL files per TM
> > > > > without introducing complex file management across job attempts. So
> > > > > the possible performance degradation is avoided.
> > > > >
> > > > > The three newly introduced options have recommended defaults. For
> > > > > upcoming versions, this feature is turned off by default. For the
> > > > > second option, SEGMENTED_ACROSS_CP_BOUNDARY is the recommended default
> > > > > as it is more effective. Of course, if encountering some DFS that does
> > > > > not support file visibility until the file is closed, it is possible
> > > > > to fall back to another option automatically. For the third option,
> > > > > 64MB is an acceptable target size. The RocksDB state backend in Flink
> > > > > also chooses 64MB as the default target file size.
> > > > >
> > > > >
> > > > > Thank you again for your quick response.
> > > > >
> > > > >
> > > > > Best regards,
> > > > > Zakelly
> > > > >
> > > > >
> > > > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei <fredia...@gmail.com> 
> > > > > wrote:
> > > > > >
> > > > > > Hi Zakelly,
> > > > > >
> > > > > > Thanks for driving this,  this proposal enables the files merging of
> > > > > > different types of states to be grouped under a unified framework. I
> > > > > > think it has the added benefit of lightening the load on JM. As
> > > > > > FLINK-26590[1] described,  triggered checkpoints can be delayed by
> > > > > > discarding shared state when JM manages a large number of files. 
> > > > > > After
> > > > > > this FLIP, JM only needs to manage some folders, which greatly 
> > > > > > reduces
> > > > > > the burden on JM.
> > > > > >
> > > > > > In Section 4.1, two types of merging granularities(per subtask and 
> > > > > > per
> > > > > > task manager) are proposed, the shared state is managed by per 
> > > > > > subtask
> > > > > > granularity, but for the changelog state backend, its DSTL files are
> > > > > > shared between checkpoints, and are currently merged in batches at 
> > > > > > the
> > > > > > task manager level. When merging with the 
> > > > > > SEGMENTED_WITHIN_CP_BOUNDARY
> > > > > > mode, I'm concerned about the performance degradation of its 
> > > > > > merging,
> > > > > > hence I wonder if the merge granularities are configurable? Further,
> > > > > > from a user perspective, three new options are introduced in this
> > > > > > FLIP, do they have recommended defaults?
> > > > > >
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-26590
> > > > > >
> > > > > > Best,
> > > > > > Yanfei
> > > > > >
> > > > > > Zakelly Lan <zakelly....@gmail.com> 于2023年4月3日周一 18:36写道:
> > > > > >
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > I would like to open a discussion on providing a unified file 
> > > > > > > merging
> > > > > > > mechanism for checkpoints[1].
> > > > > > >
> > > > > > > Currently, many files are uploaded to the DFS during checkpoints,
> > > > > > > leading to the 'file flood' problem when running
> > > > > > > intensive workloads in a cluster.  To tackle this problem, various
> > > > > > > solutions have been proposed for different types
> > > > > > > of state files. Although these methods are similar, they lack a
> > > > > > > systematic view and approach. We believe that it is
> > > > > > > better to consider this problem as a whole and introduce a unified
> > > > > > > framework to address the file flood problem for
> > > > > > > all types of state files. A POC has been implemented based on 
> > > > > > > current
> > > > > > > FLIP design, and the test results are promising.
> > > > > > >
> > > > > > >
> > > > > > > Looking forward to your comments or feedback.
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Zakelly
> > > > > > >
> > > > > > > [1]
> > > > >
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> > > > >
> > > >
> > > >

Reply via email to