Hi everyone, It seems there are no more questions unresolved. So I would like to start a vote on May 8th at 10:00 AM GMT.
Please let me know if you have any concerns, thanks! Best, Zakelly On Fri, May 5, 2023 at 4:03 PM Rui Fan <1996fan...@gmail.com> wrote: > > Hi Zakelly, > > Thanks for the clarification! > > Currently, I understand what you mean, and LGTM. > > Best, > Rui Fan > > On Fri, May 5, 2023 at 12:27 PM Zakelly Lan <zakelly....@gmail.com> wrote: > > > Hi all, > > > > @Yun Tang and I have an offline discussion, and we agreed that: > > > > 1. The design of this FLIP is pretty much like the option 3 in design > > doc[1] for FLINK-23342, and it is almost the best solution in general. > > Based on our production experience, this FLIP can solve the file flood > > problem very well. > > 2. There is a corner case that the directory may be left over when the > > job stops, so I added some content in section 4.8. > > > > Best, > > Zakelly > > > > > > [1] > > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit# > > > > On Fri, May 5, 2023 at 11:19 AM Zakelly Lan <zakelly....@gmail.com> wrote: > > > > > > Hi Rui Fan, > > > > > > Thanks for your reply. > > > > > > > Maybe re-uploaded in the next checkpoint is also a general solution > > > > for shared state? If yes, could we consider it as an optimization? > > > > And we can do it after the FLIP is done. > > > > > > Yes, it is a general solution for shared states. Maybe in the first > > > version we can let the shared states not re-use any previous state > > > handle after restoring, thus the state backend will do a full snapshot > > > and re-uploading the files it needs. This could cover the scenario > > > that rocksdb only uploads the base DB files. And later we could > > > consider performing fast copy in DFS to optimize the re-uploading. > > > WDYT? > > > > > > > > > > I'm not sure why we need 2 configurations, or whether 1 configuration > > > > is enough here. > > > > > > > The `max-file-pool-size` is hard to give a default value. For jobs with > > > > many tasks in a TM, it may be useful for file merging. However, > > > > it doesn't work well for jobs with a small number of tasks in a TM. > > > > > > > I prefer just adding the `max-file-pool-size`, and > > > > the `pool size = number of tasks / max-file-pool-size`. WDYT? > > > > > > > > > Sorry for not explaining clearly. The value of pool size is calculated > > by: > > > > > > 1. pool size = number of tasks / max-subtasks-per-file > > > 2. if pool size > max-file-pool-size then pool size = max-file-pool-size > > > > > > The `max-subtasks-per-file` addresses the issue of sequential file > > > writing, while the `max-file-pool-size` acts as a safeguard to prevent > > > an excessively large file pool. WDYT? > > > > > > > > > Thanks again for your thoughts. > > > > > > Best, > > > Zakelly > > > > > > On Thu, May 4, 2023 at 3:52 PM Rui Fan <1996fan...@gmail.com> wrote: > > > > > > > > Hi Zakelly, > > > > > > > > Sorry for the late reply, I still have some minor questions. > > > > > > > > >> (3) When rescaling, do all shared files need to be copied? > > > > > > > > > > I agree with you that only sst files of the base DB need to be copied > > > > > (or re-uploaded in the next checkpoint). However, section 4.2 > > > > > simplifies file copying issues (copying all files), following the > > > > > concept of shared state. > > > > > > > > Maybe re-uploaded in the next checkpoint is also a general solution > > > > for shared state? If yes, could we consider it as an optimization? > > > > And we can do it after the FLIP is done. > > > > > > > > >> (5) How many physical files can a TM write at the same checkpoint > > at the > > > > same time? > > > > > > > > > > This is a very good point. Actually, there is a file reuse pool as > > > > > section 4.6 described. There could be multiple files within this > > pool, > > > > > supporting concurrent writing by multiple writers. I suggest > > providing > > > > > two configurations to control the file number: > > > > > > > > > > state.checkpoints.file-merging.max-file-pool-size: Specifies the > > > > > upper limit of the file pool size. > > > > > state.checkpoints.file-merging.max-subtasks-per-file: Specifies the > > > > > lower limit of the file pool size based on the number of subtasks > > > > > within each TM. > > > > > > > > > > The number of simultaneously open files is controlled by these two > > > > > options, and the first option takes precedence over the second. > > > > > > > > I'm not sure why we need 2 configurations, or whether 1 configuration > > > > is enough here. > > > > > > > > The `max-file-pool-size` is hard to give a default value. For jobs with > > > > many tasks in a TM, it may be useful for file merging. However, > > > > it doesn't work well for jobs with a small number of tasks in a TM. > > > > > > > > I prefer just adding the `max-file-pool-size`, and > > > > the `pool size = number of tasks / max-file-pool-size`. WDYT? > > > > > > > > Maybe I missed some information. Please correct me if I'm wrong, > > thanks. > > > > > > > > Best, > > > > Rui Fan > > > > > > > > On Fri, Apr 28, 2023 at 12:10 AM Zakelly Lan <zakelly....@gmail.com> > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > Thanks for all the feedback so far. > > > > > > > > > > The discussion has been going on for some time, and all the comments > > > > > and suggestions are addressed. So I would like to start a vote on > > this > > > > > FLIP, which begins a week later (May. 5th at 10:00 AM GMT). > > > > > > > > > > If you have any concerns, please don't hesitate to follow up on this > > > > > discussion. > > > > > > > > > > > > > > > Best regards, > > > > > Zakelly > > > > > > > > > > On Fri, Apr 28, 2023 at 12:03 AM Zakelly Lan <zakelly....@gmail.com> > > > > > wrote: > > > > > > > > > > > > Hi Yuan, > > > > > > > > > > > > Thanks for sharing your thoughts. Like you said, the code changes > > and > > > > > > complexities are shaded in the newly introduced file management in > > TM, > > > > > > while the old file management remains the same. It is safe for us > > to > > > > > > take a small step towards decentralized file management in this > > way. I > > > > > > put the POC branch here[1] so everyone can check the code change. > > > > > > > > > > > > Best regards, > > > > > > Zakelly > > > > > > > > > > > > [1] https://github.com/Zakelly/flink/tree/flip306_poc > > > > > > > > > > > > On Thu, Apr 27, 2023 at 8:13 PM Yuan Mei <yuanmei.w...@gmail.com> > > wrote: > > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > Thanks @Zakelly for driving this effort and thanks everyone for > > the > > > > > warm > > > > > > > discussion. Sorry for the late response. > > > > > > > > > > > > > > As I and Zakelly have already discussed and reviewed the design > > > > > carefully > > > > > > > when drafting this FLIP, I do not have additional inputs here. > > But I > > > > > want > > > > > > > to highlight several points that I've been quoted and explain > > why I > > > > > think > > > > > > > the current design is a reasonable and clean one. > > > > > > > > > > > > > > *Why this FLIP is proposed* > > > > > > > File Flooding is a problem for Flink I've seen many people bring > > up > > > > > > > throughout the years, especially for large clusters. > > Unfortunately, > > > > > there > > > > > > > are not yet accepted solutions for the most commonly used state > > backend > > > > > > > like RocksDB. This FLIP was originally targeted to address > > merging > > > > > > > SST(KeyedState) checkpoint files. > > > > > > > > > > > > > > While we are comparing different design choices, we found that > > > > > different > > > > > > > types of checkpoint files (OPState, Unaligned CP channel state, > > > > > Changelog > > > > > > > incremental state) share similar considerations, for example, > > file > > > > > > > management, file merging granularity, and e.t.c. That's why we > > want to > > > > > > > abstract a unified framework for merging these different types of > > > > > > > checkpoint files and provide flexibility to choose between > > merging > > > > > > > efficiency, rescaling/restoring cost, File system capabilities > > > > > (affecting > > > > > > > File visibility), and e.t.c. > > > > > > > > > > > > > > *File Ownership moved from JM to TM, WHY* > > > > > > > One of the major differences in the proposed design is moving > > file > > > > > > > ownership from JM to TM. A lot of questions/concerns are coming > > from > > > > > here, > > > > > > > let me answer them one by one: > > > > > > > > > > > > > > *1. Why the current JM SharedRegistry is not enough and do we > > have to > > > > > > > introduce more complexity?* > > > > > > > SharedRegistry maintains the mapping between *a file -> max CP ID > > > > > using the > > > > > > > file* > > > > > > > For merging files, we have to introduce another level of mapping > > *a > > > > > file -> > > > > > > > checkpoint file segment (merged files)* > > > > > > > So yes, no matter what, the second level of mapping has to be > > managed > > > > > > > somewhere, either JM or TM. > > > > > > > > > > > > > > *2. Why the **complexity (second level of mapping)** cannot be > > > > > maintained > > > > > > > in JM?* > > > > > > > - As a centralized service, JM has already been complicated and > > > > > overloaded. > > > > > > > As mentioned by @Yanfei Lei <fredia...@gmail.com>, "triggering > > > > > checkpoints > > > > > > > can be delayed by discarding shared state when JM manages a large > > > > > number of > > > > > > > files FLINK-26590". This ends up setting the JM thread pool to > > 500! > > > > > > > - As explained by @Zakelly in the previous thread, the contract > > "for > > > > > > > Checkpoint N, only re-use shared state handles that have been > > already > > > > > > > referenced by checkpoint N-1" is not guaranteed for the > > concurrent > > > > > > > checkpoint in the current JM-owned design. This problem can not > > be > > > > > > > addressed without significant changes in how SharedRegistry and > > > > > checkpoint > > > > > > > subsume work, which, I do not think is worth it since > > > > > "concurrent_CP>1" is > > > > > > > not used that much in prod. > > > > > > > > > > > > > > *3. We have similar discussions before, moving ownership from JM > > to > > > > > TM, why > > > > > > > it is not adopted at that time? * > > > > > > > As mentioned by Yun and Piotr, we have had similar discussions > > to move > > > > > > > ownership from JM to TM when designing the changelog state > > backend. The > > > > > > > reason why we stuck to JM ownership at that time is mainly due to > > > > > > > engineering time/effort constraints. > > > > > > > This time, since we need an extra level of mapping, which > > complicates > > > > > the > > > > > > > JM logic even further, we indeed need to shade the complexity > > within > > > > > the TM > > > > > > > to avoid more communications between JM and TM. > > > > > > > Zakelly has already shared the code branch (about 2000 lines), > > and it > > > > > is > > > > > > > simple. > > > > > > > > > > > > > > *4. Cloud-Native Trend* > > > > > > > The current centralized file management framework contradicts the > > > > > > > cloud-native trend. That's also one of the reasons moving > > ownership > > > > > from JM > > > > > > > to TM was first proposed. The proposed design and implementation > > is a > > > > > > > worthy try-out in this direction. I'd like to put some more > > effort in > > > > > this > > > > > > > direction if this really turns out working well. > > > > > > > > > > > > > > One more thing I want to mention is that the proposed design > > shaded > > > > > all the > > > > > > > code changes and complexities in the newly introduced File > > management > > > > > in > > > > > > > TM. That says without enabling File merging, the code path of > > File > > > > > managing > > > > > > > remains the same as before. So it is also a safe change in this > > sense. > > > > > > > > > > > > > > Best, > > > > > > > Yuan > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 12, 2023 at 5:23 PM Zakelly Lan < > > zakelly....@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > Hi Yun, > > > > > > > > > > > > > > > > I reorganized our discussion and added a comparison table of > > state > > > > > > > > ownership with some previous designs. Please take a look at > > section > > > > > > > > "4.9. State ownership comparison with other designs". > > > > > > > > > > > > > > > > But I don't see them as alternatives since the design of state > > > > > > > > ownership is integrated with this FLIP. That is to say, we are > > > > > > > > providing a file merging solution including file management > > for new > > > > > > > > merged files, other ownership models are not feasible for the > > current > > > > > > > > merging plan. If the state ownership changes, the design of > > merging > > > > > > > > files at different granularities also needs to be changed > > > > > accordingly. > > > > > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > Zakelly > > > > > > > > > > > > > > > > On Tue, Apr 11, 2023 at 10:18 PM Yun Tang <myas...@live.com> > > wrote: > > > > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > > > Since we already had some discussions on this topic in the > > doc I > > > > > > > > mentioned, could you please describe the difference in your > > FLIP? > > > > > > > > > > > > > > > > > > I think we should better have a comparing table across > > different > > > > > options > > > > > > > > just like the doc wrote. And we could also list some of them > > in your > > > > > > > > Rejected Alternatives part. > > > > > > > > > > > > > > > > > > > > > > > > > > > Best > > > > > > > > > Yun Tang > > > > > > > > > ________________________________ > > > > > > > > > From: Zakelly Lan <zakelly....@gmail.com> > > > > > > > > > Sent: Tuesday, April 11, 2023 17:57 > > > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org> > > > > > > > > > Subject: Re: [DISCUSS] FLIP-306: Unified File Merging > > Mechanism for > > > > > > > > Checkpoints > > > > > > > > > > > > > > > > > > Hi Rui Fan, > > > > > > > > > > > > > > > > > > Thanks for your comments! > > > > > > > > > > > > > > > > > > > (1) The temporary segment will remain in the physical file > > for a > > > > > short > > > > > > > > time, right? > > > > > > > > > > > > > > > > > > Yes, any written segment will remain in the physical file > > until the > > > > > > > > > physical file is deleted. It is controlled by the reference > > > > > counting. > > > > > > > > > And as discussed in 4.7, this will result in a space > > amplification > > > > > > > > > problem. > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) Is subtask granularity confused with shared state? > > > > > > > > > > > > > > > > > > Merging files at granularity of subtask is a general > > solution for > > > > > > > > > shared states, considering the file may be reused by the > > following > > > > > > > > > checkpoint after job restore. This design is applicable to > > sst > > > > > files > > > > > > > > > and any other shared states that may arise in the future. > > However, > > > > > the > > > > > > > > > DSTL files are a special case of shared states, since these > > files > > > > > will > > > > > > > > > no longer be shared after job restore. Therefore, we may do > > an > > > > > > > > > optimization for these files and merge them at the TM level. > > > > > > > > > Currently, the DSTL files are not in the shared directory of > > > > > > > > > checkpoint storage, and I suggest we keep it as it is. I > > agree that > > > > > > > > > this may bring in some confusion, and I suggest the FLIP > > mainly > > > > > > > > > discuss the general situation and list the special situations > > > > > > > > > separately without bringing in new concepts. I will add > > another > > > > > > > > > paragraph describing the file merging for DSTL files. WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > (3) When rescaling, do all shared files need to be copied? > > > > > > > > > > > > > > > > > > I agree with you that only sst files of the base DB need to > > be > > > > > copied > > > > > > > > > (or re-uploaded in the next checkpoint). However, section 4.2 > > > > > > > > > simplifies file copying issues (copying all files), > > following the > > > > > > > > > concept of shared state. > > > > > > > > > > > > > > > > > > > > > > > > > > > > (4) Does the space magnification ratio need a configuration > > > > > option? > > > > > > > > > > > > > > > > > > Thanks for the reminder, I will add an option in this FLIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > (5) How many physical files can a TM write at the same > > > > > checkpoint at > > > > > > > > the same time? > > > > > > > > > > > > > > > > > > This is a very good point. Actually, there is a file reuse > > pool as > > > > > > > > > section 4.6 described. There could be multiple files within > > this > > > > > pool, > > > > > > > > > supporting concurrent writing by multiple writers. I suggest > > > > > providing > > > > > > > > > two configurations to control the file number: > > > > > > > > > > > > > > > > > > state.checkpoints.file-merging.max-file-pool-size: > > Specifies the > > > > > > > > > upper limit of the file pool size. > > > > > > > > > state.checkpoints.file-merging.max-subtasks-per-file: > > Specifies > > > > > the > > > > > > > > > lower limit of the file pool size based on the number of > > subtasks > > > > > > > > > within each TM. > > > > > > > > > > > > > > > > > > The number of simultaneously open files is controlled by > > these two > > > > > > > > > options, and the first option takes precedence over the > > second. > > > > > > > > > > > > > > > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your valuable insight. > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 10, 2023 at 7:08 PM Rui Fan < > > 1996fan...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > Thanks Zakelly driving this proposal, and thank you all for > > > > > > > > > > the warm discussions. It's really a useful feature. > > > > > > > > > > > > > > > > > > > > I have a few questions about this FLIP. > > > > > > > > > > > > > > > > > > > > (1) The temporary segment will remain in the physical file > > for > > > > > > > > > > a short time, right? > > > > > > > > > > > > > > > > > > > > FLIP proposes to write segments instead of physical files. > > > > > > > > > > If the physical files are written directly, these > > temporary files > > > > > > > > > > will be deleted after the checkpoint is aborted. When > > writing > > > > > > > > > > a segment, how to delete the temporary segment? > > > > > > > > > > Decrement the reference count value by 1? > > > > > > > > > > > > > > > > > > > > (2) Is subtask granularity confused with shared state? > > > > > > > > > > > > > > > > > > > > From the "4.1.2 Merge files within a subtask or a TM" part, > > > > > > > > > > based on the principle of sst files, it is concluded that > > > > > > > > > > "For shared states, files are merged within each subtask." > > > > > > > > > > > > > > > > > > > > I'm not sure whether this conclusion is general or just > > for sst. > > > > > > > > > > As Yanfei mentioned before: > > > > > > > > > > > > > > > > > > > > > DSTL files are shared between checkpoints, and are > > > > > > > > > > > currently merged in batches at the task manager level. > > > > > > > > > > > > > > > > > > > > DSTL files as the shared state in FLIP-306, however, it > > > > > > > > > > would be better to merge at TM granularity. So, I'm not > > > > > > > > > > sure whether the subtask granularity confused with > > > > > > > > > > shared state? > > > > > > > > > > > > > > > > > > > > And I'm not familiar with DSTL file merging, should > > > > > > > > > > shared state be divided into shared subtask state > > > > > > > > > > and shared TM state? > > > > > > > > > > > > > > > > > > > > (3) When rescaling, do all shared files need to be copied? > > > > > > > > > > > > > > > > > > > > From the "4.2 Rescaling and Physical File Lifecycle" part, > > > > > > > > > > I see a lot of file copying. > > > > > > > > > > > > > > > > > > > > As I understand, only sst files of the baseDB need to be > > copied. > > > > > > > > > > From the restore code[1], when restoreWithRescaling, flink > > will > > > > > > > > > > init a base DB instance, read all contents from other > > temporary > > > > > > > > > > rocksdb instances, and write them into the base DB, and > > then > > > > > > > > > > the temporary rocksdb instance will be discarded. > > > > > > > > > > > > > > > > > > > > So, I think copying the files of the base rocksdb is > > enough, and > > > > > > > > > > the files of other rocksdb instances aren't used. > > > > > > > > > > > > > > > > > > > > Or do not copy any files during recovery, upload all sst > > files > > > > > > > > > > at the first checkpoint. > > > > > > > > > > > > > > > > > > > > (4) Does the space magnification ratio need a configuration > > > > > option? > > > > > > > > > > > > > > > > > > > > From the step1 of "4.7 Space amplification" part, I see: > > > > > > > > > > > > > > > > > > > > > Checking whether the space amplification of each file is > > > > > greater > > > > > > > > than a > > > > > > > > > > preset threshold and collecting files that exceed the > > threshold > > > > > for > > > > > > > > > > compaction. > > > > > > > > > > > > > > > > > > > > Should we add a configuration option about the compaction > > > > > threshold? > > > > > > > > > > I didn't see it at "5. Public interfaces and User Cases" > > part. > > > > > > > > > > > > > > > > > > > > (5) How many physical files can a TM write at the same > > > > > > > > > > checkpoint at the same time? > > > > > > > > > > > > > > > > > > > > From the "5. Public interfaces and User Cases" part, I see: > > > > > > > > > > > > > > > > > > > > > A configuration option that sets a maximum size limit for > > > > > physical > > > > > > > > files. > > > > > > > > > > > > > > > > > > > > I guess that each type of state(private or shared state) > > will > > > > > only > > > > > > > > > > write one file at the same time at the same checkpoint. > > > > > > > > > > When the file reaches the maximum size, flink will start > > writing > > > > > > > > > > the next file, right? > > > > > > > > > > > > > > > > > > > > If yes, for shared state, will > > > > > > > > > > "state.backend.rocksdb.checkpoint.transfer.thread.num" > > > > > > > > > > be invalid? > > > > > > > > > > > > > > > > > > > > For private state, a TM may have many tasks (because of > > slot > > > > > > > > > > sharing, more than 20 tasks may run in a slot), and the > > > > > > > > > > performance of all tasks serially writing files may be > > poor, > > > > > > > > > > eventually resulting in longer checkpoint time. > > > > > > > > > > > > > > > > > > > > That's why FLINK-26803[2] introduced a configuration > > option: > > > > > > > > > > > > > > > > > > > > > > > > > "execution.checkpointing.unaligned.max-subtasks-per-channel-state-file". > > > > > > > > > > Flink users can set the maximum number of subtasks that > > > > > > > > > > share the same channel state file. > > > > > > > > > > > > > > > > > > > > That's all my questions right now, please correct me if > > > > > > > > > > anything is wrong. > > > > > > > > > > > > > > > > > > > > Anyway, this FLIP is useful for the stability of > > large-scale > > > > > > > > > > flink production. Looking forward to its completion and > > > > > > > > > > eventual acceptance by the community. > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/65710b437318364ec19c0369d038ac2222c10498/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L292 > > > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-26803 > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Rui Fan > > > > > > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 8:42 PM Jing Ge > > > > > <j...@ververica.com.invalid> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > Jingsong, Yanfei, please check, if you can view the doc. > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > Jing > > > > > > > > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 2:19 PM Zakelly Lan < > > > > > zakelly....@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Yanfei, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your comments. > > > > > > > > > > > > > > > > > > > > > > > > > Does this result in a larger space amplification? > > Maybe a > > > > > more > > > > > > > > > > > > suitable value can be determined through some > > experimental > > > > > > > > statistics > > > > > > > > > > > > after we implement this feature. > > > > > > > > > > > > > > > > > > > > > > > > Yes, it results in larger space amplification for > > shared > > > > > states. I > > > > > > > > > > > > will do more tests and investigation. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 8:15 PM Zakelly Lan < > > > > > zakelly....@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > Hi @Piotr and @Jingsong Li > > > > > > > > > > > > > > > > > > > > > > > > > > I have read access to the document, but I'm not sure > > > > > whether the > > > > > > > > owner > > > > > > > > > > > > > of this document wants to make it public. Actually, > > the > > > > > doc is > > > > > > > > for > > > > > > > > > > > > > FLINK-23342 and there is a candidate design very > > similar > > > > > to this > > > > > > > > FLIP, > > > > > > > > > > > > > but only for the shared state. Like Yun said, the > > previous > > > > > > > > design is > > > > > > > > > > > > > not taken because of the code complexity, however I > > think > > > > > it is > > > > > > > > > > > > > acceptable after implementing the POC[1]. I think we > > could > > > > > focus > > > > > > > > on > > > > > > > > > > > > > the current plan, WDTY? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] POC of this FLIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 8:13 PM Zakelly Lan < > > > > > > > > zakelly....@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your comments! > > > > > > > > > > > > > > > > > > > > > > > > > > > > (1) Sorry for the misleading, let me make it more > > clear. > > > > > It is > > > > > > > > a > > > > > > > > > > > > > > concurrent checkpoint senario. Yes, the assumption > > you > > > > > said > > > > > > > > needs to > > > > > > > > > > > > > > be followed, but the state handles here refer to > > the > > > > > original > > > > > > > > SST > > > > > > > > > > > > > > files, not the underlying file. In this FLIP when > > > > > checkpoint N > > > > > > > > and > > > > > > > > > > > N+1 > > > > > > > > > > > > > > are running concurrently, they reuse files from > > > > > checkpoint > > > > > > > > N-1, and > > > > > > > > > > > > > > some of the files may be deleted when checkpoint N > > > > > completes > > > > > > > > while > > > > > > > > > > > > > > checkpoint N+1 is still writing on it. There is no > > such > > > > > > > > problem for > > > > > > > > > > > > > > original shared states without file merging > > because when > > > > > a > > > > > > > > state > > > > > > > > > > > > > > handle (or sst file here) from checkpoint N-1 is > > not > > > > > > > > referenced by > > > > > > > > > > > > > > checkpoint N, it will not be referenced by > > checkpoint > > > > > N+1. So > > > > > > > > the > > > > > > > > > > > > > > subsumption of sst files from checkpoint N-1 are > > safe. > > > > > > > > > > > > > > For above example, when reaching step "d.", File 1 > > > > > reached the > > > > > > > > size > > > > > > > > > > > > > > threshold and will not be used. The Chk-2 and > > Chk-3 are > > > > > running > > > > > > > > > > > > > > concurrently, and the File 3 is being written by > > Chk-2, > > > > > so it > > > > > > > > can not > > > > > > > > > > > > > > be used by Chk-3 (As described in section 4.6). > > Here > > > > > comes the > > > > > > > > > > > > > > problem. > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) Please correct me if I'm wrong. The purpose of > > the > > > > > > > > > > > > > > `RecoverableWriter` is to provide a reliable file > > writer > > > > > even > > > > > > > > > > > > > > tolerable with job failure and recovery. The > > > > > implementation > > > > > > > > varies > > > > > > > > > > > > > > among the file systems, some of which involves > > writing > > > > > into > > > > > > > > temporary > > > > > > > > > > > > > > files (such as HDFS). As a result, it may produce > > more > > > > > RPC > > > > > > > > requests > > > > > > > > > > > to > > > > > > > > > > > > > > the DFS. > > > > > > > > > > > > > > The goal of this FLIP is to reduce the pressure on > > DFS, > > > > > > > > especially > > > > > > > > > > > the > > > > > > > > > > > > > > number of files and RPC requests. Currently the > > TMs are > > > > > NOT > > > > > > > > using the > > > > > > > > > > > > > > RecoverableWriter to persist/upload the state > > files, and > > > > > a file > > > > > > > > > > > > > > closing is enough. The section 4.1.1 is trying to > > omit > > > > > this > > > > > > > > file > > > > > > > > > > > > > > closing but ensure file visibility in some DFS, > > thus > > > > > reducing > > > > > > > > > > > pressure > > > > > > > > > > > > > > on DFS. That's why I said the problems they want to > > > > > solve are > > > > > > > > > > > > > > different. I'm not sure if I made myself clear. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 8:08 PM Zakelly Lan < > > > > > > > > zakelly....@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yun, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your suggestions! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have read the FLINK-23342 and its design doc > > as you > > > > > > > > provided. > > > > > > > > > > > First > > > > > > > > > > > > > > > of all the goal of this FLIP and the doc are > > similar, > > > > > and the > > > > > > > > > > > design > > > > > > > > > > > > > > > of this FLIP is pretty much like option 3. The > > main > > > > > > > > difference is > > > > > > > > > > > > that > > > > > > > > > > > > > > > we imply the concept of 'epoch' in the folder > > path for > > > > > each > > > > > > > > > > > > > > > granularity. For shared state, the folder for > > each > > > > > subtask > > > > > > > > is like > > > > > > > > > > > > > > > > > > > > "${checkpointBaseDir}/shared/subtask-{index}-{parallelism}", > > > > > > > > so if > > > > > > > > > > > > the > > > > > > > > > > > > > > > ${checkpointBaseDir} changes (when user restart > > a job > > > > > > > > manually) or > > > > > > > > > > > > the > > > > > > > > > > > > > > > ${parallelism} changes (when rescaling), there > > will be > > > > > a > > > > > > > > > > > > re-uploading, > > > > > > > > > > > > > > > and the JM takes care of the old artifacts. The > > folder > > > > > path > > > > > > > > for > > > > > > > > > > > > > > > private state is in the form of > > > > > > > > > > > > > > > "${checkpointBaseDir}/tm-owned/${tmResourceId}" > > and the > > > > > > > > division of > > > > > > > > > > > > > > > responsibilities between JM and TM is similar. > > The > > > > > design of > > > > > > > > this > > > > > > > > > > > > FLIP > > > > > > > > > > > > > > > inherits all the advantages of the design of > > option 3 > > > > > in > > > > > > > > that doc, > > > > > > > > > > > > and > > > > > > > > > > > > > > > also avoids extra communication for epoch > > maintenance. > > > > > As > > > > > > > > for the > > > > > > > > > > > > code > > > > > > > > > > > > > > > complexity, you may check the POC commit[1] and > > find > > > > > that the > > > > > > > > > > > > > > > implementation is pretty clean and is a totally > > new > > > > > code path > > > > > > > > > > > making > > > > > > > > > > > > > > > nearly no influence on the old one. Comparing the > > > > > number of > > > > > > > > lines > > > > > > > > > > > of > > > > > > > > > > > > > > > code change with what's currently done for > > merging > > > > > channel > > > > > > > > state[2] > > > > > > > > > > > > > > > (5200 vs. 2500 additions), I think it is > > acceptable > > > > > > > > considering we > > > > > > > > > > > > are > > > > > > > > > > > > > > > providing a unified file merging framework, which > > > > > would save > > > > > > > > a lot > > > > > > > > > > > of > > > > > > > > > > > > > > > effort in future. WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] POC of this FLIP: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c > > > > > > > > > > > > > > > [2] Commit for FLINK-26803 (Merge the channel > > state > > > > > files) : > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/commit/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 7:22 PM Yanfei Lei < > > > > > > > > fredia...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your explanation Zakelly. > > > > > > > > > > > > > > > > (1) Keeping these merging granularities for > > different > > > > > > > > types of > > > > > > > > > > > > files > > > > > > > > > > > > > > > > as presets that are not configurable is a good > > idea > > > > > to > > > > > > > > prevent > > > > > > > > > > > > > > > > performance degradation. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) > > > > > > > > > > > > > > > > > For the third option, 64MB is an acceptable > > target > > > > > size. > > > > > > > > The > > > > > > > > > > > > RocksDB state backend in Flink also chooses 64MB as the > > > > > default > > > > > > > > target > > > > > > > > > > > file > > > > > > > > > > > > size. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Does this result in a larger space > > amplification? > > > > > Maybe a > > > > > > > > more > > > > > > > > > > > > > > > > suitable value can be determined through some > > > > > experimental > > > > > > > > > > > > statistics > > > > > > > > > > > > > > > > after we implement this feature. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Yanfei > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jingsong Li <jingsongl...@gmail.com> > > 于2023年4月7日周五 > > > > > 17:09写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yun, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It looks like this doc needs permission to > > read? > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit# > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > Jingsong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 4:34 PM Piotr > > Nowojski < > > > > > > > > > > > > pnowoj...@apache.org> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > +1 To what Yun Tang wrote. We don't seem > > to have > > > > > > > > access to > > > > > > > > > > > the > > > > > > > > > > > > design doc. > > > > > > > > > > > > > > > > > > Could you make it publicly visible or copy > > out > > > > > its > > > > > > > > content to > > > > > > > > > > > > another > > > > > > > > > > > > > > > > > > document? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your answers Zakelly. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (1) > > > > > > > > > > > > > > > > > > Yes, the current mechanism introduced in > > > > > FLINK-24611 > > > > > > > > allows > > > > > > > > > > > > for checkpoint > > > > > > > > > > > > > > > > > > N, to only re-use shared state handles > > that have > > > > > been > > > > > > > > already > > > > > > > > > > > > referenced by > > > > > > > > > > > > > > > > > > checkpoint N-1. But why do we need to > > break this > > > > > > > > assumption? > > > > > > > > > > > > In your step, > > > > > > > > > > > > > > > > > > "d.", TM could adhere to that assumption, > > and > > > > > instead > > > > > > > > of > > > > > > > > > > > > reusing File-2, it > > > > > > > > > > > > > > > > > > could either re-use File-1, File-3 or > > create a > > > > > new > > > > > > > > file. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) > > > > > > > > > > > > > > > > > > Can you elaborate a bit more on this? As > > far as I > > > > > > > > recall, the > > > > > > > > > > > > purpose of > > > > > > > > > > > > > > > > > > the `RecoverableWriter` is to support > > exactly the > > > > > > > > things > > > > > > > > > > > > described in this > > > > > > > > > > > > > > > > > > FLIP, so what's the difference? If you are > > > > > saying that > > > > > > > > for > > > > > > > > > > > > this FLIP you > > > > > > > > > > > > > > > > > > can implement something more efficiently > > for a > > > > > given > > > > > > > > > > > > FileSystem, then why > > > > > > > > > > > > > > > > > > can it not be done the same way for the > > > > > > > > `RecoverableWriter`? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > Piotrek > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > czw., 6 kwi 2023 o 17:24 Yun Tang < > > > > > myas...@live.com> > > > > > > > > > > > > napisał(a): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for driving this work! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not sure did you ever read the > > discussion > > > > > between > > > > > > > > > > > > Stephan, Roman, > > > > > > > > > > > > > > > > > > > Piotr, Yuan and I in the design doc [1] > > in > > > > > nearly > > > > > > > > two years > > > > > > > > > > > > ago. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From my understanding, your proposal is > > also a > > > > > mixed > > > > > > > > state > > > > > > > > > > > > ownership: some > > > > > > > > > > > > > > > > > > > states are owned by the TM while some are > > > > > owned by > > > > > > > > the JM. > > > > > > > > > > > > If my memory is > > > > > > > > > > > > > > > > > > > correct, we did not take the option-3 or > > > > > option-5 in > > > > > > > > the > > > > > > > > > > > > design doc [1] for > > > > > > > > > > > > > > > > > > > the code complexity when implements the > > 1st > > > > > version > > > > > > > > of > > > > > > > > > > > > changelog > > > > > > > > > > > > > > > > > > > state-backend. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Could you also compare the current FLIP > > with > > > > > the > > > > > > > > proposals > > > > > > > > > > > > in the design > > > > > > > > > > > > > > > > > > > doc[1]? From my understanding, we should > > at > > > > > least > > > > > > > > consider > > > > > > > > > > > > to comapre with > > > > > > > > > > > > > > > > > > > option-3 and option-5 as they are all > > mixed > > > > > > > > solutions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit# > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best > > > > > > > > > > > > > > > > > > > Yun Tang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ------------------------------ > > > > > > > > > > > > > > > > > > > *From:* Zakelly Lan < > > zakelly....@gmail.com> > > > > > > > > > > > > > > > > > > > *Sent:* Thursday, April 6, 2023 16:38 > > > > > > > > > > > > > > > > > > > *To:* dev@flink.apache.org < > > > > > dev@flink.apache.org> > > > > > > > > > > > > > > > > > > > *Subject:* Re: [DISCUSS] FLIP-306: > > Unified File > > > > > > > > Merging > > > > > > > > > > > > Mechanism for > > > > > > > > > > > > > > > > > > > Checkpoints > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for all the feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (1) Thanks for the reminder. I have just > > seen > > > > > the > > > > > > > > > > > > FLINK-24611, the delayed > > > > > > > > > > > > > > > > > > > deletion by JM resolves some sync > > problems > > > > > between > > > > > > > > JM and > > > > > > > > > > > > TM, but I'm > > > > > > > > > > > > > > > > > > > afraid it is still not feasible for the > > file > > > > > sharing > > > > > > > > in > > > > > > > > > > > this > > > > > > > > > > > > FLIP. > > > > > > > > > > > > > > > > > > > Considering a concurrent checkpoint > > scenario as > > > > > > > > follows: > > > > > > > > > > > > > > > > > > > a. Checkpoint 1 finishes. 1.sst, > > 2.sst and > > > > > 3.sst > > > > > > > > are > > > > > > > > > > > > written in file 1, > > > > > > > > > > > > > > > > > > > and 4.sst is written in file 2. > > > > > > > > > > > > > > > > > > > b. Checkpoint 2 starts based on > > checkpoint > > > > > 1, > > > > > > > > including > > > > > > > > > > > > 1.sst, 2.sst > > > > > > > > > > > > > > > > > > > and 5.sst. > > > > > > > > > > > > > > > > > > > c. Checkpoint 3 starts based on > > checkpoint > > > > > 1, > > > > > > > > including > > > > > > > > > > > > 1.sst, 2.sst > > > > > > > > > > > > > > > > > > > and 5.sst as well. > > > > > > > > > > > > > > > > > > > d. Checkpoint 3 reuses the file 2, TM > > writes > > > > > > > > 5.sst on > > > > > > > > > > > it. > > > > > > > > > > > > > > > > > > > e. Checkpoint 2 creates a new file 3, > > TM > > > > > writes > > > > > > > > 5.sst on > > > > > > > > > > > > it. > > > > > > > > > > > > > > > > > > > f. Checkpoint 2 finishes, checkpoint > > 1 is > > > > > > > > subsumed and > > > > > > > > > > > > the file 2 is > > > > > > > > > > > > > > > > > > > deleted, while checkpoint 3 still needs > > file 2. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I attached a diagram to describe the > > scenario. > > > > > > > > > > > > > > > > > > > [image: concurrent cp.jpg] > > > > > > > > > > > > > > > > > > > The core issue is that this FLIP > > introduces a > > > > > > > > mechanism > > > > > > > > > > > that > > > > > > > > > > > > allows > > > > > > > > > > > > > > > > > > > physical files to be potentially used by > > the > > > > > next > > > > > > > > several > > > > > > > > > > > > checkpoints. JM > > > > > > > > > > > > > > > > > > > is uncertain whether there will be a TM > > > > > continuing > > > > > > > > to write > > > > > > > > > > > > to a specific > > > > > > > > > > > > > > > > > > > file. So in this FLIP, TMs take the > > > > > responsibility to > > > > > > > > > > > delete > > > > > > > > > > > > the physical > > > > > > > > > > > > > > > > > > > files. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) IIUC, the RecoverableWriter is > > introduced > > > > > to > > > > > > > > persist > > > > > > > > > > > > data in the "in > > > > > > > > > > > > > > > > > > > progress" files after each checkpoint, > > and the > > > > > > > > > > > > implementation may be based > > > > > > > > > > > > > > > > > > > on the file sync in some file systems. > > However, > > > > > > > > since the > > > > > > > > > > > > sync is a heavy > > > > > > > > > > > > > > > > > > > operation for DFS, this FLIP wants to > > use flush > > > > > > > > instead of > > > > > > > > > > > > the sync with > > > > > > > > > > > > > > > > > > > the best effort. This only fits the case > > that > > > > > the > > > > > > > > DFS is > > > > > > > > > > > > considered > > > > > > > > > > > > > > > > > > > reliable. The problems they want to > > solve are > > > > > > > > different. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (3) Yes, if files are managed by JM via > > the > > > > > shared > > > > > > > > > > > registry, > > > > > > > > > > > > this problem > > > > > > > > > > > > > > > > > > > is solved. And as I mentioned in (1), > > there > > > > > are some > > > > > > > > other > > > > > > > > > > > > corner cases > > > > > > > > > > > > > > > > > > > hard to resolve via the shared registry. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The goal of this FLIP is to have a > > common way > > > > > of > > > > > > > > merging > > > > > > > > > > > > files in all use > > > > > > > > > > > > > > > > > > > cases. For shared state it merges at > > subtask > > > > > level, > > > > > > > > while > > > > > > > > > > > > for private state > > > > > > > > > > > > > > > > > > > (and changelog files, as I replied to > > Yanfei), > > > > > files > > > > > > > > are > > > > > > > > > > > > merged at TM > > > > > > > > > > > > > > > > > > > level. So it is not contrary to the > > current > > > > > plan for > > > > > > > > the > > > > > > > > > > > > unaligned > > > > > > > > > > > > > > > > > > > checkpoint state (FLINK-26803). You are > > right > > > > > that > > > > > > > > the > > > > > > > > > > > > unaligned checkpoint > > > > > > > > > > > > > > > > > > > state would be merged with the > > operator's state > > > > > > > > file, so > > > > > > > > > > > > overall, it is > > > > > > > > > > > > > > > > > > > slightly better than what's currently > > done. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks again for the valuable comments! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 5, 2023 at 8:43 PM Piotr > > Nowojski < > > > > > > > > > > > > pnowoj...@apache.org> > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for coming up with the proposal, > > it's > > > > > > > > definitely > > > > > > > > > > > > valuable. I'm still > > > > > > > > > > > > > > > > > > > reading and trying to understand the > > proposal, > > > > > but a > > > > > > > > couple > > > > > > > > > > > > of comments > > > > > > > > > > > > > > > > > > > from my side. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (1) > > > > > > > > > > > > > > > > > > > > Ownership of a single checkpoint file > > is > > > > > > > > transferred to > > > > > > > > > > > > TM, while JM > > > > > > > > > > > > > > > > > > > manages the parent directory of these > > files. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Have you seen > > > > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-24611 > > before? I > > > > > > > > > > > > > > > > > > > don't fully remember why, but we have > > rejected > > > > > the > > > > > > > > idea of > > > > > > > > > > > > moving the file > > > > > > > > > > > > > > > > > > > ownership to TM and instead reworked the > > > > > shared file > > > > > > > > > > > > registry in a way that > > > > > > > > > > > > > > > > > > > I think should be sufficient for file > > sharing. > > > > > Could > > > > > > > > you > > > > > > > > > > > > elaborate why we > > > > > > > > > > > > > > > > > > > need to move the file ownership to TM, > > and why > > > > > is the > > > > > > > > > > > > current mechanism not > > > > > > > > > > > > > > > > > > > sufficient? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) > > > > > > > > > > > > > > > > > > > > File visibility is needed when a Flink > > job > > > > > > > > recovers after > > > > > > > > > > > > a checkpoint is > > > > > > > > > > > > > > > > > > > materialized. In some DFS, such as most > > object > > > > > > > > storages, a > > > > > > > > > > > > file is only > > > > > > > > > > > > > > > > > > > visible after it is closed > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Is that really the case? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > `org.apache.flink.core.fs.FileSystem#createRecoverableWriter` > > > > > > > > seems to be > > > > > > > > > > > > > > > > > > > addressing exactly this issue, and the > > most > > > > > > > > frequently used > > > > > > > > > > > > FileSystem (S3) > > > > > > > > > > > > > > > > > > > AFAIK supports it with no problems? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (3) > > > > > > > > > > > > > > > > > > > > 4.1.2 Merge files within a subtask or > > a TM > > > > > > > > > > > > > > > > > > > > Given that TMs are reassigned after > > > > > restoration, > > > > > > > > it is > > > > > > > > > > > > difficult to > > > > > > > > > > > > > > > > > > > manage physical files that contain data > > from > > > > > multiple > > > > > > > > > > > > subtasks scattered > > > > > > > > > > > > > > > > > > > across different TMs (as depicted in > > Fig.3). > > > > > There > > > > > > > > is no > > > > > > > > > > > > synchronization > > > > > > > > > > > > > > > > > > > mechanism between TMs, making file > > management > > > > > in this > > > > > > > > > > > > scenario challenging. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think this is solved in many places > > already > > > > > via the > > > > > > > > > > > shared > > > > > > > > > > > > state managed > > > > > > > > > > > > > > > > > > > by the JM, as I mentioned in (1). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If I understand it correctly you are > > proposing > > > > > to > > > > > > > > have a > > > > > > > > > > > > common > > > > > > > > > > > > > > > > > > > interface/way of merging small files, in > > all > > > > > use > > > > > > > > cases, > > > > > > > > > > > that > > > > > > > > > > > > would work > > > > > > > > > > > > > > > > > > > only across a single subtask? That's > > contrary > > > > > to > > > > > > > > what's > > > > > > > > > > > > currently done for > > > > > > > > > > > > > > > > > > > unaligned checkpoints, right? But if this > > > > > generic > > > > > > > > mechanism > > > > > > > > > > > > was to be used > > > > > > > > > > > > > > > > > > > for unaligned checkpoints, unaligned > > checkpoint > > > > > > > > state would > > > > > > > > > > > > have been > > > > > > > > > > > > > > > > > > > merged with the operators state file, so > > all > > > > > in all > > > > > > > > there > > > > > > > > > > > > would be no > > > > > > > > > > > > > > > > > > > regression visible to a user? The limit > > is > > > > > that we > > > > > > > > always > > > > > > > > > > > > have at least a > > > > > > > > > > > > > > > > > > > single file per subtask, but in exchange > > we are > > > > > > > > getting a > > > > > > > > > > > > simpler threading > > > > > > > > > > > > > > > > > > > model? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Bets, > > > > > > > > > > > > > > > > > > > Piotrek > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wt., 4 kwi 2023 o 08:51 Zakelly Lan < > > > > > > > > zakelly....@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > napisał(a): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yanfei, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for your prompt response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree that managing (deleting) only > > some > > > > > folders > > > > > > > > with > > > > > > > > > > > JM > > > > > > > > > > > > can greatly > > > > > > > > > > > > > > > > > > > > relieve JM's burden. Thanks for > > pointing > > > > > this out. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, merging at the TM level is > > more > > > > > > > > effective > > > > > > > > > > > > since there are > > > > > > > > > > > > > > > > > > > > usually more files to merge. > > Therefore, I > > > > > believe > > > > > > > > it is > > > > > > > > > > > > better to > > > > > > > > > > > > > > > > > > > > merge files per TM as much as possible. > > > > > However, > > > > > > > > for > > > > > > > > > > > > shared state, > > > > > > > > > > > > > > > > > > > > merging at the subtask level is the > > best > > > > > choice to > > > > > > > > > > > prevent > > > > > > > > > > > > significant > > > > > > > > > > > > > > > > > > > > data transfer over the network after > > > > > restoring. I > > > > > > > > think > > > > > > > > > > > it > > > > > > > > > > > > is better > > > > > > > > > > > > > > > > > > > > to keep these merging granularities for > > > > > different > > > > > > > > types > > > > > > > > > > > of > > > > > > > > > > > > files as > > > > > > > > > > > > > > > > > > > > presets that are not configurable. > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the DSTL files, they are merged > > per > > > > > TM and > > > > > > > > placed > > > > > > > > > > > > in the > > > > > > > > > > > > > > > > > > > > task-owned folder. These files can be > > > > > classified as > > > > > > > > > > > shared > > > > > > > > > > > > state since > > > > > > > > > > > > > > > > > > > > they are shared across checkpoints. > > However, > > > > > the > > > > > > > > DSTL > > > > > > > > > > > file > > > > > > > > > > > > is a > > > > > > > > > > > > > > > > > > > > special case that will be subsumed by > > the > > > > > first > > > > > > > > > > > checkpoint > > > > > > > > > > > > of the > > > > > > > > > > > > > > > > > > > > newly restored job. Therefore, there > > is no > > > > > need > > > > > > > > for new > > > > > > > > > > > > TMs to keep > > > > > > > > > > > > > > > > > > > > these files after the old checkpoint is > > > > > subsumed, > > > > > > > > just > > > > > > > > > > > > like the > > > > > > > > > > > > > > > > > > > > private state files. Thus, it is > > feasible to > > > > > merge > > > > > > > > DSTL > > > > > > > > > > > > files per TM > > > > > > > > > > > > > > > > > > > > without introducing complex file > > management > > > > > across > > > > > > > > job > > > > > > > > > > > > attempts. So > > > > > > > > > > > > > > > > > > > > the possible performance degradation is > > > > > avoided. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The three newly introduced options have > > > > > recommended > > > > > > > > > > > > defaults. For > > > > > > > > > > > > > > > > > > > > upcoming versions, this feature is > > turned > > > > > off by > > > > > > > > default. > > > > > > > > > > > > For the > > > > > > > > > > > > > > > > > > > > second option, > > SEGMENTED_ACROSS_CP_BOUNDARY > > > > > is the > > > > > > > > > > > > recommended default > > > > > > > > > > > > > > > > > > > > as it is more effective. Of course, if > > > > > > > > encountering some > > > > > > > > > > > > DFS that does > > > > > > > > > > > > > > > > > > > > not support file visibility until the > > file is > > > > > > > > closed, it > > > > > > > > > > > > is possible > > > > > > > > > > > > > > > > > > > > to fall back to another option > > > > > automatically. For > > > > > > > > the > > > > > > > > > > > > third option, > > > > > > > > > > > > > > > > > > > > 64MB is an acceptable target size. The > > > > > RocksDB > > > > > > > > state > > > > > > > > > > > > backend in Flink > > > > > > > > > > > > > > > > > > > > also chooses 64MB as the default > > target file > > > > > size. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you again for your quick > > response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei > > Lei < > > > > > > > > > > > > fredia...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for driving this, this > > proposal > > > > > enables > > > > > > > > the > > > > > > > > > > > > files merging of > > > > > > > > > > > > > > > > > > > > > different types of states to be > > grouped > > > > > under a > > > > > > > > unified > > > > > > > > > > > > framework. I > > > > > > > > > > > > > > > > > > > > > think it has the added benefit of > > > > > lightening the > > > > > > > > load > > > > > > > > > > > on > > > > > > > > > > > > JM. As > > > > > > > > > > > > > > > > > > > > > FLINK-26590[1] described, triggered > > > > > checkpoints > > > > > > > > can be > > > > > > > > > > > > delayed by > > > > > > > > > > > > > > > > > > > > > discarding shared state when JM > > manages a > > > > > large > > > > > > > > number > > > > > > > > > > > > of files. After > > > > > > > > > > > > > > > > > > > > > this FLIP, JM only needs to manage > > some > > > > > folders, > > > > > > > > which > > > > > > > > > > > > greatly reduces > > > > > > > > > > > > > > > > > > > > > the burden on JM. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In Section 4.1, two types of merging > > > > > > > > granularities(per > > > > > > > > > > > > subtask and per > > > > > > > > > > > > > > > > > > > > > task manager) are proposed, the > > shared > > > > > state is > > > > > > > > managed > > > > > > > > > > > > by per subtask > > > > > > > > > > > > > > > > > > > > > granularity, but for the changelog > > state > > > > > > > > backend, its > > > > > > > > > > > > DSTL files are > > > > > > > > > > > > > > > > > > > > > shared between checkpoints, and are > > > > > currently > > > > > > > > merged in > > > > > > > > > > > > batches at the > > > > > > > > > > > > > > > > > > > > > task manager level. When merging > > with the > > > > > > > > > > > > SEGMENTED_WITHIN_CP_BOUNDARY > > > > > > > > > > > > > > > > > > > > > mode, I'm concerned about the > > performance > > > > > > > > degradation > > > > > > > > > > > of > > > > > > > > > > > > its merging, > > > > > > > > > > > > > > > > > > > > > hence I wonder if the merge > > granularities > > > > > are > > > > > > > > > > > > configurable? Further, > > > > > > > > > > > > > > > > > > > > > from a user perspective, three new > > options > > > > > are > > > > > > > > > > > > introduced in this > > > > > > > > > > > > > > > > > > > > > FLIP, do they have recommended > > defaults? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > https://issues.apache.org/jira/browse/FLINK-26590 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > Yanfei > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Zakelly Lan <zakelly....@gmail.com> > > > > > 于2023年4月3日周一 > > > > > > > > > > > > 18:36写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I would like to open a discussion > > on > > > > > providing > > > > > > > > a > > > > > > > > > > > > unified file merging > > > > > > > > > > > > > > > > > > > > > > mechanism for checkpoints[1]. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Currently, many files are uploaded > > to > > > > > the DFS > > > > > > > > during > > > > > > > > > > > > checkpoints, > > > > > > > > > > > > > > > > > > > > > > leading to the 'file flood' > > problem when > > > > > > > > running > > > > > > > > > > > > > > > > > > > > > > intensive workloads in a cluster. > > To > > > > > tackle > > > > > > > > this > > > > > > > > > > > > problem, various > > > > > > > > > > > > > > > > > > > > > > solutions have been proposed for > > > > > different > > > > > > > > types > > > > > > > > > > > > > > > > > > > > > > of state files. Although these > > methods > > > > > are > > > > > > > > similar, > > > > > > > > > > > > they lack a > > > > > > > > > > > > > > > > > > > > > > systematic view and approach. We > > believe > > > > > that > > > > > > > > it is > > > > > > > > > > > > > > > > > > > > > > better to consider this problem as > > a > > > > > whole and > > > > > > > > > > > > introduce a unified > > > > > > > > > > > > > > > > > > > > > > framework to address the file flood > > > > > problem for > > > > > > > > > > > > > > > > > > > > > > all types of state files. A POC > > has been > > > > > > > > implemented > > > > > > > > > > > > based on current > > > > > > > > > > > > > > > > > > > > > > FLIP design, and the test results > > are > > > > > > > > promising. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Looking forward to your comments or > > > > > feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >