[ 
https://issues.apache.org/jira/browse/BEAM-11104?focusedWorklogId=756519&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756519
 ]

ASF GitHub Bot logged work on BEAM-11104:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/22 16:37
            Start Date: 13/Apr/22 16:37
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r848647091


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime,
                if r1 != nil {
                        return nil, r1.(error)
                }
+               if n.outPcIdx >= 0 {
+                       n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, 
Continuation: r0.(sdf.ProcessContinuation)}
+                       return &n.ret, nil
+               }
                n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
                return &n.ret, nil
        case n.outEtIdx == 0:
+               if n.outPcIdx >= 0 {
+                       panic("invoker.ret2: cannot return event time without a 
value")
+               }
                n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Pane: pn}
                return &n.ret, nil
        default:

Review Comment:
   I posit that that's a flawed construction we shouldn't permit. How much 
simpler does the code become if we avoid that case? (given the changes 
required, just guess, ignorning the sunk cost of "we already have this code") 
   
   Returning a single element and always choosing to stop or aborting the 
bundle feels awkward and it would be a performance pit when used accidentally.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +347,11 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                defer func() {
                        <-n.SU
                }()
-               return n.PDo.processSingleWindow(mainIn)
+               continuation, processResult := n.PDo.processSingleWindow(mainIn)
+               if continuation != nil {
+                       n.continuation = continuation
+               }

Review Comment:
   The invoker code already ensured that the continuation is not nil, so why 
check here? Also, the continuation field would likely *always* be nil to start, 
so overriding it with a nil doesn't change anything.
   
   I'd rather that it gets overriden all the time, since it avoids cross bundle 
contamination to begin with.
   
   
   



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -144,12 +145,13 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error 
{
        // is that either there is a single window or the function doesn't 
observe windows, so we can
        // optimize it by treating all windows as a single one.
        if !mustExplodeWindows(n.inv.fn, elm, len(n.Side) > 0) {
-               return n.processSingleWindow(mainIn)
+               _, processResult := n.processSingleWindow(mainIn)

Review Comment:
   Please add comments to both these calls to make it clear that they are 
ignored because only SDFs should have process continuations.



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn 
typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0 interface{}) (*FullValue, error) {
        switch {
        case n.outErrIdx >= 0:
                if r0 != nil {
                        return nil, r0.(error)
                }
                return nil, nil
+       case n.outPcIdx >= 0:
+               n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, 
Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Agreed with this whole discussion, since nil is always an error for a 
ProcessContinuation, and hopefully it should provide a clear enough error 
(identifying the malformed DoFn in question.)



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
        Elm  interface{} // Element or KV key.
        Elm2 interface{} // KV value, if not invalid
 
-       Timestamp typex.EventTime
-       Windows   []typex.Window
-       Pane      typex.PaneInfo
+       Timestamp    typex.EventTime
+       Windows      []typex.Window
+       Pane         typex.PaneInfo
+       Continuation sdf.ProcessContinuation

Review Comment:
   Where FullValue is concerned nothing is "automatic" for safety and 
performance reasons. Safety, to avoid the situation described, and performance 
because coders are explicitly expressed by the runner and not determined per 
element.
   
   Eg. 
[KVs](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/coder.go#L556)
 and the 
[WindowedValueHeaders](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/coder.go#L824)
   
   
[PairWithRestrictions](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L81)
 explicitly builds up a KV structure with the restriction, 
   
   And 
[SplitAndSize](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L170)
 does the same but with the size.
   
   Continuations add a non-orthogonal handling wrinkle that your code will be 
handling, but it's not clear that there's a better way to get the user value 
out of the user function and half computation, so FullValue it is.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) 
ProcessElement(_ context.Context,
                        n.rt = rt
                        n.elm = elm
                        n.SU <- n
-                       err := n.PDo.processSingleWindow(&MainInput{Key: wElm, 
Values: mainIn.Values, RTracker: rt})
+                       _, err := n.PDo.processSingleWindow(&MainInput{Key: 
wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   I agree with Danny that until the placeholder is removed, there should be a 
comment (referencing the JIRA) that indicates that it's a placeholder. Adding 
the comment now (even if it's going away in the next PR), is a stop gap incase 
there's unexpected delays for the PR, and a breadcrumb for someone else to pick 
it up if necessary.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 756519)
    Time Spent: 9h 40m  (was: 9.5h)

> [Go SDK] DoFn Self Checkpointing
> --------------------------------
>
>                 Key: BEAM-11104
>                 URL: https://issues.apache.org/jira/browse/BEAM-11104
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Jack McCluskey
>            Priority: P3
>          Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Allow SplittableDoFns to self checkpoint.
> Design doc: 
> https://docs.google.com/document/d/1_JbzjY9JR07ZK5v7PcZevUfzHPsqwzfV7W6AouNpMPk/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to