[ https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=768015&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768015 ]
ASF GitHub Bot logged work on BEAM-14347: ----------------------------------------- Author: ASF GitHub Bot Created on: 09/May/22 16:55 Start Date: 09/May/22 16:55 Worklog Time Spent: 10m Work Description: lostluck commented on code in PR #17574: URL: https://github.com/apache/beam/pull/17574#discussion_r868221388 ########## sdks/go/pkg/beam/registration/iter.go: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registration + +import ( + "fmt" + "io" + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" +) + +type iter1[T any] struct { + s exec.ReStream + + // cur is the "current" stream, if any. + cur exec.Stream +} + +func (v *iter1[T]) Init() error { + cur, err := v.s.Open() + if err != nil { + return err + } + v.cur = cur + return nil +} + +func (v *iter1[T]) Value() interface{} { + return v.invoke +} + +func (v *iter1[T]) Reset() error { + if err := v.cur.Close(); err != nil { + return err + } + v.cur = nil + return nil +} + +func (v *iter1[T]) invoke(value *T) bool { + elm, err := v.cur.Read() + if err != nil { + if err == io.EOF { + return false + } + panic(fmt.Sprintf("broken stream: %v", err)) + } + *value = elm.Elm.(T) + return true +} + +type iter2[T1, T2 any] struct { + s exec.ReStream + + // cur is the "current" stream, if any. + cur exec.Stream +} + +func (v *iter2[T1, T2]) Init() error { + cur, err := v.s.Open() + if err != nil { + return err + } + v.cur = cur + return nil +} + +func (v *iter2[T1, T2]) Value() interface{} { + return v.invoke +} + +func (v *iter2[T1, T2]) Reset() error { + if err := v.cur.Close(); err != nil { + return err + } + v.cur = nil + return nil +} + +func (v *iter2[T1, T2]) invoke(key *T1, value *T2) bool { + elm, err := v.cur.Read() + if err != nil { + if err == io.EOF { + return false + } + panic(fmt.Sprintf("broken stream: %v", err)) + } + *key = elm.Elm.(T1) + *value = elm.Elm2.(T2) + return true +} + +// Iter1 registers parameters from your DoFn with a +// signature func(*T) bool and optimizes their execution. +// This must be done by passing in type parameters of all inputs as constraints, +// aka: registration.Iter1[T]() Review Comment: The input parameter must be a pointer so what you had as *T is correct, but T itself could be a pointer type like `*Foo`, so the resolved iterator would be `func(**Foo) bool` Issue Time Tracking ------------------- Worklog Id: (was: 768015) Time Spent: 10h 20m (was: 10h 10m) > [Go SDK] Allow users to optimize DoFns with a single generic registration > function > ---------------------------------------------------------------------------------- > > Key: BEAM-14347 > URL: https://issues.apache.org/jira/browse/BEAM-14347 > Project: Beam > Issue Type: New Feature > Components: sdk-go > Reporter: Danny McCormick > Assignee: Danny McCormick > Priority: P2 > Time Spent: 10h 20m > Remaining Estimate: 0h > > Right now, to optimize DoFn execution, users have to use the code generator. > This updates to allow them to use generics instead. -- This message was sent by Atlassian Jira (v8.20.7#820007)