[ 
https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=768015&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-768015
 ]

ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/May/22 16:55
            Start Date: 09/May/22 16:55
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17574:
URL: https://github.com/apache/beam/pull/17574#discussion_r868221388


##########
sdks/go/pkg/beam/registration/iter.go:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package registration
+
+import (
+       "fmt"
+       "io"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+)
+
+type iter1[T any] struct {
+       s exec.ReStream
+
+       // cur is the "current" stream, if any.
+       cur exec.Stream
+}
+
+func (v *iter1[T]) Init() error {
+       cur, err := v.s.Open()
+       if err != nil {
+               return err
+       }
+       v.cur = cur
+       return nil
+}
+
+func (v *iter1[T]) Value() interface{} {
+       return v.invoke
+}
+
+func (v *iter1[T]) Reset() error {
+       if err := v.cur.Close(); err != nil {
+               return err
+       }
+       v.cur = nil
+       return nil
+}
+
+func (v *iter1[T]) invoke(value *T) bool {
+       elm, err := v.cur.Read()
+       if err != nil {
+               if err == io.EOF {
+                       return false
+               }
+               panic(fmt.Sprintf("broken stream: %v", err))
+       }
+       *value = elm.Elm.(T)
+       return true
+}
+
+type iter2[T1, T2 any] struct {
+       s exec.ReStream
+
+       // cur is the "current" stream, if any.
+       cur exec.Stream
+}
+
+func (v *iter2[T1, T2]) Init() error {
+       cur, err := v.s.Open()
+       if err != nil {
+               return err
+       }
+       v.cur = cur
+       return nil
+}
+
+func (v *iter2[T1, T2]) Value() interface{} {
+       return v.invoke
+}
+
+func (v *iter2[T1, T2]) Reset() error {
+       if err := v.cur.Close(); err != nil {
+               return err
+       }
+       v.cur = nil
+       return nil
+}
+
+func (v *iter2[T1, T2]) invoke(key *T1, value *T2) bool {
+       elm, err := v.cur.Read()
+       if err != nil {
+               if err == io.EOF {
+                       return false
+               }
+               panic(fmt.Sprintf("broken stream: %v", err))
+       }
+       *key = elm.Elm.(T1)
+       *value = elm.Elm2.(T2)
+       return true
+}
+
+// Iter1 registers parameters from your DoFn with a
+// signature func(*T) bool and optimizes their execution.
+// This must be done by passing in type parameters of all inputs as 
constraints,
+// aka: registration.Iter1[T]()

Review Comment:
   The input parameter must be a pointer so what you had as *T is correct, but 
T itself could be a pointer type like `*Foo`, so the resolved iterator would be 
`func(**Foo) bool`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 768015)
    Time Spent: 10h 20m  (was: 10h 10m)

> [Go SDK] Allow users to optimize DoFns with a single generic registration 
> function
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-14347
>                 URL: https://issues.apache.org/jira/browse/BEAM-14347
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Danny McCormick
>            Assignee: Danny McCormick
>            Priority: P2
>          Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator. 
> This updates to allow them to use generics instead.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to