[ https://issues.apache.org/jira/browse/BEAM-11104?focusedWorklogId=758515&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758515 ]
ASF GitHub Bot logged work on BEAM-11104: ----------------------------------------- Author: ASF GitHub Bot Created on: 19/Apr/22 14:36 Start Date: 19/Apr/22 14:36 Worklog Time Spent: 10m Work Description: jrmccluskey commented on code in PR #17386: URL: https://github.com/apache/beam/pull/17386#discussion_r853153735 ########## sdks/go/pkg/beam/core/runtime/exec/datasource.go: ########## @@ -320,6 +321,72 @@ func (n *DataSource) Progress() ProgressReportSnapshot { return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, Count: c, pcol: pcol} } +func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation { + if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok { + return u.continuation + } + return nil +} + +// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a +// ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not +// splittable or has not returned a resuming continuation, the function returns an empty +// SplitResult, a negative resumption time, and a false boolean to indicate that no split +// occurred. +func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { + n.mu.Lock() + defer n.mu.Unlock() + + pc := n.getProcessContinuation() + if pc == nil { + return SplitResult{}, -1 * time.Minute, false, nil + } + if !pc.ShouldResume() { + return SplitResult{}, -1 * time.Minute, false, nil + } + + su := SplittableUnit(n.Out.(*ProcessSizedElementsAndRestrictions)) + + // Get the output watermark before splitting to avoid accidentally overestimating + ow := su.GetOutputWatermark() Review Comment: Oh good, in that case I took the comment off. No need to add incorrect documentation here. Issue Time Tracking ------------------- Worklog Id: (was: 758515) Time Spent: 14h (was: 13h 50m) > [Go SDK] DoFn Self Checkpointing > -------------------------------- > > Key: BEAM-11104 > URL: https://issues.apache.org/jira/browse/BEAM-11104 > Project: Beam > Issue Type: Sub-task > Components: sdk-go > Reporter: Robert Burke > Assignee: Jack McCluskey > Priority: P3 > Time Spent: 14h > Remaining Estimate: 0h > > Allow SplittableDoFns to self checkpoint. > Design doc: > https://docs.google.com/document/d/1_JbzjY9JR07ZK5v7PcZevUfzHPsqwzfV7W6AouNpMPk/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.7#820007)