I agree with Aston's insight. Step could be just buffering. Re "the "standard" Go iteration interface." <- this is not.a great API to offer clients, for the reasons you observe. Hence I don't think it should be considered as an API standard, or much of a standard at all, as such. Its better as an implementation detail.
And your larger problem stems from the same source of pain: trying to integrate iter.Seq across API boundaries. Its not really good for that--it will always create an extremely tight coupling between producer and consumer. In essence they are sharing a thread and jumping between each other's stacks. In fact, if you leave the iter clutter aside, then your problem becomes trivial -- just reuse the guts of the sum/processor function but leave off the problematic seq iterface. That said, if you really have to use a sub-routine that insists on an iter.Seq interface, then I conclude that goroutines are unavoidable, because the iter protocol is a blocking protocol. Here's the simplest thing otherwise; I did it from spec -- it might be close to your solution-- but it omits the confusing wait channel. If it gives you ideas, great. Perhaps it contributes to your study in that I could not see any way to do it without a goroutine. That means channels too. package main import "iter" // the pre-existing "processor" func consumeAndSum(it iter.Seq[int]) (sum int) { for i := range it { println("consumer sees ", i) sum += i } println("consumer done.") return sum } // the shim between clients and processor, that implements Summer. type UseProcessor struct { initDone bool it iter.Seq[int] feedme chan int result chan int done chan struct{} } func (m *UseProcessor) AddObs(obs int) { if !m.initDone { m.initDone = true m.done = make(chan struct{}) m.result = make(chan int) // To avoid a shutdown race, // we keep feedme un-buffered; we // special case yeilding the firstObs // to acheive this. m.feedme = make(chan int) firstObs := obs // define an iterator to feed the processor. // capture the channels (feedme,done) in the closure. m.it = iter.Seq[int](func(yield func(int) bool) { if !yield(firstObs) { return } for { select { case newObs := <-m.feedme: if !yield(newObs) { // processor wants to halt early return } case <-m.done: // client wants to halt. // by exiting, we tell the // processor to compute its final // result sum. return } } }) // start the processor with the iterator // the iterator protocol forces this to be in a goroutine since cosumeAndSum will block. go func() { m.result <- consumeAndSum(m.it) }() return } m.feedme <- obs } func (m *UseProcessor) Sum() int { close(m.done) return <-m.result } type Summer interface { AddObs(obs int) Sum() int } func main() { var summer Summer = &UseProcessor{} for i := range 4 { summer.AddObs(i) } println("all done. sum = ", summer.Sum()) } On Tuesday, February 25, 2025 at 2:53:53 PM UTC Aston Motes wrote: > Nuno, can you say more about the constraints here? It seems you could you > accomplish your goal of creating an AggregateFunction out of an iter.Seq > processor > function by storing all of the results in a slice as Step is called, then > running your processor function on the slice of values ( > slices.Values(storedVals)) at the end when Value is called? > > If not, is the idea that you want to avoid intermediate storage and have > the processor function aggregating values as you go along? If so, perhaps > you could substitute a chan for the slice and send the processor off to do > its thing in a goroutine. In that case, you would only need a utility > function to adapt a channel to a iter.Seq. > > On Tue, Feb 25, 2025 at 9:17 AM Nuno Cruces <ncr...@gmail.com> wrote: > >> Hi Jason, >> >> First of all, thank you for your interest. >> >> On Tue, 25 Feb 2025 at 12:28, Jason E. Aten <j.e....@gmail.com> wrote: >> >>> >>> The problem specification you originally gave was this: >>> >>> > "to implement... a "processor" function that accepts an iter.Seq and >>> returns a result: >>> > func processor(seq iter.Seq[value]) result { ... }" >>> >> >> If you're going to omit important parts of the problem specification (the >> bit after implement), we'll get nowhere. >> >> I don't want "to implement... a processor", I want to implement this >> interface >> <https://pkg.go.dev/github.com/ncruces/go-sqlite3#AggregateFunction> >> *with* a processor. >> To make it easier, here's the interface (which was linked to in the >> original message): >> >> type AggregateFunction interface { >> // Step is invoked to add a row to the current window. >> // The function arguments, if any, corresponding to the row being >> added, are passed to Step. >> // Implementations must not retain arg. >> Step(ctx Context, arg ...Value) >> >> >> // Value is invoked to return the current (or final) value of the >> aggregate. >> Value(ctx Context) >> } >> >> >> The first thing you should note is that the consumer of this interface >> calls me, not the other way around. >> It calls me to give me the sequence of values (in Step), and to stop >> iteration and get a result (in Value). >> >> Now assume I have a preexisting function that calculates the sum of an >> iter.Seq[float64], like so: >> >> func sum(seq iter.Seq[float64]) float64 { >> count := 0 >> total := 0.0 >> for arg := range seq { >> total += arg >> count++ >> } >> return total / float64(count) >> } >> >> >> I want to use this function to "easily" implement an AggregateFunction. >> >> As for why the AggregateFunction is what it is, it's the interface to >> register an SQLite aggregate function. >> And, obviously, sum is not the function I really want to call. But maybe >> there's a library that provides a nice and complex function that I want to >> reuse, and which uses what, going forward, is supposed to be the "standard" >> Go iteration interface. >> >> Moreover, you originally said, >>> "I've been cracking my head, and I'm convinced I can't with just >>> iter.Seq/iter.Pull" >>> >> >> I wanted a solution that doesn't necessarily involve goroutines and >> channels, for the same reason that iter.Pull was created: because >> goroutines and channels add unnecessary parallelism that has the potential >> to introduce data races, when what is needed is concurrency *without* >> <https://research.swtch.com/coro#thread>parallelism >> <https://research.swtch.com/coro#thread>. >> >> >>> Which says, in contradiction to your later comment/claim, that you did >>> not have a solution >>> when you asked for one. >>> >> >> I have a solution, to which I linked in my original email. >> The solution I found *can* be expressed without goroutines and channels, >> but it can also be *better* expressed with newcoro / coroswitch. >> I *can't* express it without either. >> >> >>> The code I provided demonstrates that a solution to the stated problem >>> is indeed possible >>> with iter.Seq. >>> >> >> If you can show me how to use your code to implement AggregateFunction, >> you will have solved my problem as stated. >> Otherwise, I'm afraid it falls short. >> >> Kind regards, >> Nuno >> >> -- >> > You received this message because you are subscribed to the Google Groups >> "golang-nuts" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to golang-nuts...@googlegroups.com. >> > To view this discussion visit >> https://groups.google.com/d/msgid/golang-nuts/CAM0U__8k6%2BEiKZO%3DUuxXkVjVh8w6y%2BG9Zc0j2eH3GU02XiJ_iQ%40mail.gmail.com >> >> <https://groups.google.com/d/msgid/golang-nuts/CAM0U__8k6%2BEiKZO%3DUuxXkVjVh8w6y%2BG9Zc0j2eH3GU02XiJ_iQ%40mail.gmail.com?utm_medium=email&utm_source=footer> >> . >> > -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion visit https://groups.google.com/d/msgid/golang-nuts/21b5764d-36de-49fd-ac01-91e53ffadd68n%40googlegroups.com.