Hi Xintong, However, it's probably not so good for users who don't need such > retrieval and already used a ZooKeeper/Native-Kubernetes HA to specify > another remote FS path for storing job results, even if they are > automatically cleaned-up on committed. >
Users of ZK / k8s HA are forced to use a remote FS as well. *DefaultJobGraphStore* and *DefaultCompletedCheckpoint* store are both implemented using *StateHandleStore*. This means that they are only storing pointers in metadata store (ZK / ConfigMap) to the actual blobs that are stored on DFS (these can be quite bulky). The reasoning behind this dates back to early days when S3 was an eventually consistent FS and didn't provide strong read-after-write consistency and we needed a consistent view of the state. We've already started discussion with Till that we might want to get rid of the metadata store for both of these in near future as it's not longer needed with modern filesystems and is a source of various race conditions that are hard to reason about. We can reuse already configured "high-availability.storageDir" for storing the results. Best, D. On Tue, Nov 30, 2021 at 11:14 AM Xintong Song <tonysong...@gmail.com> wrote: > Thanks for the explanations, Matthias. > > Including JobResultStore in HighAvailabilityServices as a replacement of > RunningJobRegistry makes sense to me. And initializing JobResultStore in > the same way initializing JobGraphStore also sounds good. > > I have another question concerning where to persist the job results. The > FLIP proposes two implementations of JobResultStore: In-Memory and > FileSystem. I'm wondering if we should by default persist the results in HA > services (i.e., ZooKeeper or Kubernetes ConfigMap) when enabled. This is > how RunningJobRegistry and JobGraphStore persist things currently. > > IIUC, the reason we want a FileSystemJobResultStore is that it allows not > only the dispatcher but also a 3rd party to retrieve the result after the > job finishes, making scenarios like multi-stage application mode possible. > However, it's probably not so good for users who don't need such > retrieval and already used a ZooKeeper/Native-Kubernetes HA to specify > another remote FS path for storing job results, even if they are > automatically cleaned-up on committed. > > Maybe we can use the HA storage by default, and make the FileSystem opt-in. > WDYT? > > Thank you~ > > Xintong Song > > > > On Tue, Nov 30, 2021 at 5:35 PM Matthias Pohl <matth...@ververica.com> > wrote: > > > Hi Xintong, > > your observation is correct. We probably didn't address this in the FLIP > > explicitly enough. We planned to include it in the > HighAvailabilityServices > > analogously to the RunningJobRegistry (and replace the RunningJobRegistry > > by the JobResultStore in the end). > > > > One additional thing, I want to point out: The JobResultStore and the > > JobGraphStore have closely-related lifecycles. Both provide information > for > > the Dispatcher's initialization. Therefore, we're planning to initialize > > the JobResultStore in the same way, the JobGraphStore is initialized in > > the DispatcherLeaderProcessFactory and referenced > > in DispatcherLeaderProcess. The Dispatcher will get the information about > > recovered JobGraphs (as it's currently done on master) and the JobResult > of > > globally-terminated jobs that are still marked as "dirty" by the > > JobResultStore. The cleanup based on the latter ones will then happen in > > the Dispatcher. > > > > Matthias > > > > On Tue, Nov 30, 2021 at 3:28 AM Xintong Song <tonysong...@gmail.com> > > wrote: > > > >> Thanks David, Matthias and Mika, > >> > >> I like this FLIP in the way it handles potential re-execution and > >> resource leaks due to clean-up failures. > >> > >> I have one question: Why is this JobResultStore not part of the high > >> availability services? Or ask differently, are there cases that we only > >> need the HA services but not JobResultStore, or vice versa? > >> > >> Thank you~ > >> > >> Xintong Song > >> > >> > >> > >> On Tue, Nov 30, 2021 at 9:19 AM Kurt Young <ykt...@gmail.com> wrote: > >> > >>> Hi, > >>> > >>> I didn't fully read the FLIP but the name somehow confused me. My first > >>> impression of > >>> seeing this is we are providing some storage for job execution results, > >>> like the one > >>> returned with accumulators in batch mode. Would a name like > >>> "JobStautsStore" be more > >>> appropriate? > >>> > >>> Best, > >>> Kurt > >>> > >>> > >>> On Mon, Nov 29, 2021 at 8:22 PM Zhu Zhu <reed...@gmail.com> wrote: > >>> > >>> > Thanks for drafting this FLIP, Matthias, Mika and David. > >>> > > >>> > I like the proposed JobResultStore. Besides addressing the problem of > >>> > re-executing finished jobs, it's also an important step towards HA of > >>> > multi-job Flink applications. > >>> > > >>> > I have one question that, in the "Cleanup" section, it shows that the > >>> > JobMaster is responsible for cleaning up > >>> CheckpointCounter/CheckpointStore. > >>> > Does this mean Flink will have to re-create > >>> > JobMaster/Scheduler/ExecutionGraph for a terminated job to do the > >>> cleanup? > >>> > If so, this can be heavy in certain cases because the ExecutionGraph > >>> > creation may conduct connector initialization. So I'm thinking > whether > >>> it's > >>> > possible to make CheckpointCounter/CheckpointStore a component of > >>> > Dispatcher? > >>> > > >>> > Thanks, > >>> > Zhu > >>> > > >>> > Till Rohrmann <trohrm...@apache.org> 于2021年11月27日周六 上午1:29写道: > >>> > > >>> > > Thanks for creating this FLIP Matthias, Mika and David. > >>> > > > >>> > > I think the JobResultStore is an important piece for fixing Flink's > >>> last > >>> > > high-availability problem (afaik). Once we have this piece in > place, > >>> > users > >>> > > no longer risk to re-execute a successfully completed job. > >>> > > > >>> > > I have one comment concerning breaking interfaces: > >>> > > > >>> > > If we don't want to break interfaces, then we could keep the > >>> > > HighAvailabilityServices.getRunningJobsRegistry() method and add a > >>> > default > >>> > > implementation for HighAvailabilityServices.getJobResultStore(). We > >>> could > >>> > > then deprecate the former method and then remove it in the > subsequent > >>> > > release (1.16). > >>> > > > >>> > > Apart from that, +1 for the FLIP. > >>> > > > >>> > > Cheers, > >>> > > Till > >>> > > > >>> > > On Wed, Nov 17, 2021 at 6:05 PM David Morávek <d...@apache.org> > >>> wrote: > >>> > > > >>> > > > Hi everyone, > >>> > > > > >>> > > > Matthias, Mika and I want to start a discussion about > introduction > >>> of a > >>> > > new > >>> > > > Flink component, the *JobResultStore*. > >>> > > > > >>> > > > The main motivation is to address shortcomings of the > >>> > > *RunningJobsRegistry* > >>> > > > and surpass it with the new component. These shortcomings have > been > >>> > first > >>> > > > described in FLINK-11813 [1]. > >>> > > > > >>> > > > This change should improve the overall stability of the > >>> JobManager's > >>> > > > components and address the race conditions in some of the fail > over > >>> > > > scenarios during the job cleanup lifecycle. > >>> > > > > >>> > > > It should also help to ensure that Flink doesn't leave any > >>> uncleaned > >>> > > > resources behind. > >>> > > > > >>> > > > We've prepared a FLIP-194 [2], which outlines the design and > >>> reasoning > >>> > > > behind this new component. > >>> > > > > >>> > > > [1] https://issues.apache.org/jira/browse/FLINK-11813 > >>> > > > [2] > >>> > > > > >>> > > > >>> > > >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435 > >>> > > > > >>> > > > We're looking forward for your feedback ;) > >>> > > > > >>> > > > Best, > >>> > > > Matthias, Mika and David > >>> > > > > >>> > > > >>> > > >> > >> >