[ 
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=768016&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768016
 ]

ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/May/22 16:58
            Start Date: 09/May/22 16:58
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17579:
URL: https://github.com/apache/beam/pull/17579#discussion_r868223691


##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
        Teardown(ctx context.Context) error
 }
 
+type createAccumulator0x1[T any] interface {
+    CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+    CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+    AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+    AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+    MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+    MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+    ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+    ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's 
structural functions
+// and types and optimizes their runtime execution. There are 3 different 
Accumulator
+// functions, each of which should be used for a different situation.
+{{if (eq $genericParams 1)}}// Accumulator1 should be used when your 
accumulator, input, and output are all of the same type.
+// It can be called with register.Accumulator1[T](&CustomAccumulator{})
+// where T is the type of the input/accumulator/output.
+{{else}}{{if (eq $genericParams 2)}}// Accumulator2 should be used when your 
accumulator, input, and output are 2 distinct types.
+// It can be called with register.Accumulator2[T1, T2](&CustomAccumulator{})
+// where T1 is the type of the accumulator and T2 is the other type.
+{{else}}// Accumulator3 should be used when your accumulator, input, and 
output are 3 distinct types.
+// It can be called with register.Accumulator3[T1, T2, 
T3](&CustomAccumulator{})
+// where T1 is the type of the accumulator, T2 is the type of the input, and 
T3 is the type of the output.
+{{end}}{{end}}func Accumulator{{$genericParams}}[{{range $paramNum := upto 
$genericParams}}{{if $paramNum}}, {{end}}T{{$paramNum}}{{end}} any](accum 
interface{}) {
+    registerAccumulatorTypes(accum)
+    accumVal := reflect.ValueOf(accum)
+    var mergeAccumulatorsWrapper func(fn interface{}) reflectx.Func
+    if _, ok := accum.(mergeAccumulators2x2[T0]); ok {
+        caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(T0, T0) (T0, error))
+            return &caller2x2[T0, T0, T0, error]{fn: f}
+        }
+               reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) (T0, 
error))(nil)).Elem(), caller)
+
+        mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+                       return reflectx.MakeFunc(func(a0 T0, a1 T0) (T0, error) 
{
+                               return 
fn.(mergeAccumulators2x2[T0]).MergeAccumulators(a0, a1)
+                       })
+               }
+    } else if _, ok := accum.(mergeAccumulators2x1[T0]); ok {
+        caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(T0, T0) T0)
+            return &caller2x1[T0, T0, T0]{fn: f}
+        }
+               reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) 
T0)(nil)).Elem(), caller)
+
+        mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func {
+                       return reflectx.MakeFunc(func(a0 T0, a1 T0) T0 {
+                               return 
fn.(mergeAccumulators2x1[T0]).MergeAccumulators(a0, a1)
+                       })
+               }
+    }
+
+    if mergeAccumulatorsWrapper == nil{

Review Comment:
   Done!



##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -241,13 +242,320 @@ type teardown1x1 interface {
        Teardown(ctx context.Context) error
 }
 
+type createAccumulator0x1[T any] interface {
+    CreateAccumulator() T
+}
+
+type createAccumulator0x2[T any] interface {
+    CreateAccumulator() (T, error)
+}
+
+type addInput2x1[T1, T2 any] interface {
+    AddInput(a T1, i T2) T1
+}
+
+type addInput2x2[T1, T2 any] interface {
+    AddInput(a T1, i T2) (T1, error)
+}
+
+type mergeAccumulators2x1[T any] interface {
+    MergeAccumulators(a0 T, a1 T) T
+}
+
+type mergeAccumulators2x2[T any] interface {
+    MergeAccumulators(a0 T, a1 T) (T, error)
+}
+
+type extractOutput1x1[T1, T2 any] interface {
+    ExtractOutput(a T1) T2
+}
+
+type extractOutput1x2[T1, T2 any] interface {
+    ExtractOutput(a T1) (T2, error)
+}
+
+{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}}
+// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's 
structural functions

Review Comment:
   Yeah, that makes sense - updated!





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768016)
    Time Spent: 10.5h  (was: 10h 20m)

> [Go SDK] Allow users to optimize DoFns with a single generic registration 
> function
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-14347
>                 URL: https://issues.apache.org/jira/browse/BEAM-14347
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Danny McCormick
>            Assignee: Danny McCormick
>            Priority: P2
>          Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator. 
> This updates to allow them to use generics instead.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to