[ https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=768010&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768010 ]
ASF GitHub Bot logged work on BEAM-14347: ----------------------------------------- Author: ASF GitHub Bot Created on: 09/May/22 16:47 Start Date: 09/May/22 16:47 Worklog Time Spent: 10m Work Description: lostluck commented on code in PR #17579: URL: https://github.com/apache/beam/pull/17579#discussion_r868211497 ########## sdks/go/pkg/beam/registration/registration.tmpl: ########## @@ -241,13 +242,320 @@ type teardown1x1 interface { Teardown(ctx context.Context) error } +type createAccumulator0x1[T any] interface { + CreateAccumulator() T +} + +type createAccumulator0x2[T any] interface { + CreateAccumulator() (T, error) +} + +type addInput2x1[T1, T2 any] interface { + AddInput(a T1, i T2) T1 +} + +type addInput2x2[T1, T2 any] interface { + AddInput(a T1, i T2) (T1, error) +} + +type mergeAccumulators2x1[T any] interface { + MergeAccumulators(a0 T, a1 T) T +} + +type mergeAccumulators2x2[T any] interface { + MergeAccumulators(a0 T, a1 T) (T, error) +} + +type extractOutput1x1[T1, T2 any] interface { + ExtractOutput(a T1) T2 +} + +type extractOutput1x2[T1, T2 any] interface { + ExtractOutput(a T1) (T2, error) +} + +{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}} +// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's structural functions Review Comment: We probably want to call this Combiner1 etc. They are called CombineFns or Combiners not accumulators. Accumulators are the intermediate type that CombineFns operate on. See https://beam.apache.org/documentation/programming-guide/#combine for the usual user nomenclature. Eg. for a mean Combinefn, one could have (some number, say `int`) as AddInput's input type. AddInput would add it to the accumulator type `struct{Count int, Sum float64}`, MergeAccumulators merges them together, and ExtractOutput does the division of the Sum by the Count, to get the `float64` output type for the Mean. ########## sdks/go/pkg/beam/registration/registration.tmpl: ########## @@ -241,13 +242,320 @@ type teardown1x1 interface { Teardown(ctx context.Context) error } +type createAccumulator0x1[T any] interface { + CreateAccumulator() T +} + +type createAccumulator0x2[T any] interface { + CreateAccumulator() (T, error) +} + +type addInput2x1[T1, T2 any] interface { + AddInput(a T1, i T2) T1 +} + +type addInput2x2[T1, T2 any] interface { + AddInput(a T1, i T2) (T1, error) +} + +type mergeAccumulators2x1[T any] interface { + MergeAccumulators(a0 T, a1 T) T +} + +type mergeAccumulators2x2[T any] interface { + MergeAccumulators(a0 T, a1 T) (T, error) +} + +type extractOutput1x1[T1, T2 any] interface { + ExtractOutput(a T1) T2 +} + +type extractOutput1x2[T1, T2 any] interface { + ExtractOutput(a T1) (T2, error) +} + +{{range $accum := upto 3}}{{$genericParams := (add $accum 1)}} +// Accumulator{{$genericParams}} registers an accumulator (CombineFn) DoFn's structural functions +// and types and optimizes their runtime execution. There are 3 different Accumulator +// functions, each of which should be used for a different situation. +{{if (eq $genericParams 1)}}// Accumulator1 should be used when your accumulator, input, and output are all of the same type. +// It can be called with register.Accumulator1[T](&CustomAccumulator{}) +// where T is the type of the input/accumulator/output. +{{else}}{{if (eq $genericParams 2)}}// Accumulator2 should be used when your accumulator, input, and output are 2 distinct types. +// It can be called with register.Accumulator2[T1, T2](&CustomAccumulator{}) +// where T1 is the type of the accumulator and T2 is the other type. +{{else}}// Accumulator3 should be used when your accumulator, input, and output are 3 distinct types. +// It can be called with register.Accumulator3[T1, T2, T3](&CustomAccumulator{}) +// where T1 is the type of the accumulator, T2 is the type of the input, and T3 is the type of the output. +{{end}}{{end}}func Accumulator{{$genericParams}}[{{range $paramNum := upto $genericParams}}{{if $paramNum}}, {{end}}T{{$paramNum}}{{end}} any](accum interface{}) { + registerAccumulatorTypes(accum) + accumVal := reflect.ValueOf(accum) + var mergeAccumulatorsWrapper func(fn interface{}) reflectx.Func + if _, ok := accum.(mergeAccumulators2x2[T0]); ok { + caller := func(fn interface{}) reflectx.Func { + f := fn.(func(T0, T0) (T0, error)) + return &caller2x2[T0, T0, T0, error]{fn: f} + } + reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) (T0, error))(nil)).Elem(), caller) + + mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func { + return reflectx.MakeFunc(func(a0 T0, a1 T0) (T0, error) { + return fn.(mergeAccumulators2x2[T0]).MergeAccumulators(a0, a1) + }) + } + } else if _, ok := accum.(mergeAccumulators2x1[T0]); ok { + caller := func(fn interface{}) reflectx.Func { + f := fn.(func(T0, T0) T0) + return &caller2x1[T0, T0, T0]{fn: f} + } + reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) T0)(nil)).Elem(), caller) + + mergeAccumulatorsWrapper = func(fn interface{}) reflectx.Func { + return reflectx.MakeFunc(func(a0 T0, a1 T0) T0 { + return fn.(mergeAccumulators2x1[T0]).MergeAccumulators(a0, a1) + }) + } + } + + if mergeAccumulatorsWrapper == nil{ Review Comment: need space after nil Issue Time Tracking ------------------- Worklog Id: (was: 768010) Time Spent: 10h 10m (was: 10h) > [Go SDK] Allow users to optimize DoFns with a single generic registration > function > ---------------------------------------------------------------------------------- > > Key: BEAM-14347 > URL: https://issues.apache.org/jira/browse/BEAM-14347 > Project: Beam > Issue Type: New Feature > Components: sdk-go > Reporter: Danny McCormick > Assignee: Danny McCormick > Priority: P2 > Time Spent: 10h 10m > Remaining Estimate: 0h > > Right now, to optimize DoFn execution, users have to use the code generator. > This updates to allow them to use generics instead. -- This message was sent by Atlassian Jira (v8.20.7#820007)