Buffering trivially solves the issue, sure, but the point (of both the interface and Seq) is not to buffer. The database could spit (many) millions of rows that we want to aggregate into a result. It's why we have algorithms like HyperLogLog <https://en.wikipedia.org/wiki/HyperLogLog>, and T-Digest or KLL <https://en.wikipedia.org/wiki/Quantile#Approximate_quantiles_from_a_stream> .
About the standard interface, which, yes, I can ignore, I'll just quote the proposal <https://github.com/golang/go/discussions/56413>: *There is no standard way to iterate over a sequence of values in Go. For lack of any convention, we have ended up with a wide variety of approaches. Each implementation has done what made the most sense in that context, but decisions made in isolation have resulted in confusion for users.* Or the former discussion <https://github.com/golang/go/discussions/54245> titled "standard iterator interface": *Most languages provide a standardized way to iterate over values stored in containers using an iterator interface (see the appendix below for a discussion of other languages). Go provides for range for use with maps, slices, strings, arrays, and channels, but it does not provide any general mechanism for user-written containers, and it does not provide an iterator interface.* Maybe it's too early to have this discussion on adapting/repurposing this *"newfangled iterator interface"* to other tasks, but OTOH, this is what iter.Pull is all about. BTW, iter.Pull is successful at allowing fs.WalkDir to feed a pull style iterator <https://github.com/ncruces/go-sqlite3/blob/dadf53e1750d3c06ce6ff33c1819450270109501/ext/fileio/fsdir.go#L105-L117>, which before required goroutines and channels to achieve <https://github.com/ncruces/go-sqlite3/blob/v0.13.0/ext/fileio/fsdir.go> (and was buggy and racy, though that's my own fault). PS: the purpose of my "confusing wait channel" is to ensure (as best as I could) that there is *no* parallelism. Which, BTW, is one assurance iter.Pull offers (as well as: no races, panics are forwarded, Goexit handled, etc...) I need to review your solution with more care, thanks! Kind regards, Nuno On Tuesday 25 February 2025 at 16:26:43 UTC Jason E. Aten wrote: > I agree with Aston's insight. Step could be just buffering. > > Re "the "standard" Go iteration interface." <- this is not.a great API to > offer clients, for the reasons you observe. Hence I don't think it should > be considered as an API standard, or much of a standard at all, as such. > Its better as an implementation detail. > > And your larger problem stems from the same source of pain: trying to > integrate iter.Seq across API boundaries. Its not really good for that--it > will always create an extremely tight coupling between producer and > consumer. In essence they are sharing a thread and jumping between each > other's stacks. > > In fact, if you leave the iter clutter aside, then your problem becomes > trivial -- just reuse the guts of the sum/processor function but leave off > the problematic seq iterface. > > That said, if you really have to use a sub-routine that insists on an > iter.Seq interface, then > I conclude that goroutines are unavoidable, because the iter protocol is a > blocking protocol. > > Here's the simplest thing otherwise; I did it from spec -- it might be > close to your solution-- but it omits the confusing wait channel. If it > gives you ideas, great. Perhaps it contributes to your study in that I > could not see any way to > do it without a goroutine. That means channels too. > > package main > import "iter" > // the pre-existing "processor" > > func consumeAndSum(it iter.Seq[int]) (sum int) { > for i := range it { > println("consumer sees ", i) > sum += i > } > println("consumer done.") > return sum > } > // the shim between clients and processor, that implements Summer. > type UseProcessor struct { > initDone bool > it iter.Seq[int] > feedme chan int > result chan int > done chan struct{} > } > func (m *UseProcessor) AddObs(obs int) { > if !m.initDone { > m.initDone = true > > m.done = make(chan struct{}) > m.result = make(chan int) > > // To avoid a shutdown race, > // we keep feedme un-buffered; we > // special case yeilding the firstObs > // to acheive this. > m.feedme = make(chan int) > firstObs := obs > > // define an iterator to feed the processor. > // capture the channels (feedme,done) in the closure. > m.it = iter.Seq[int](func(yield func(int) bool) { > if !yield(firstObs) { > return > } > for { > select { > case newObs := <-m.feedme: > if !yield(newObs) { > // processor wants to halt early > return > } > case <-m.done: > // client wants to halt. > // by exiting, we tell the > // processor to compute its final > // result sum. > return > } > } > }) > // start the processor with the iterator > // the iterator protocol forces this to be in a goroutine > since cosumeAndSum will block. > go func() { > m.result <- consumeAndSum(m.it) > }() > return > } > m.feedme <- obs > } > func (m *UseProcessor) Sum() int { > close(m.done) > return <-m.result > } > type Summer interface { > AddObs(obs int) > Sum() int > } > func main() { > var summer Summer = &UseProcessor{} > for i := range 4 { > summer.AddObs(i) > } > println("all done. sum = ", summer.Sum()) > } > > On Tuesday, February 25, 2025 at 2:53:53 PM UTC Aston Motes wrote: > >> Nuno, can you say more about the constraints here? It seems you could you >> accomplish your goal of creating an AggregateFunction out of an iter.Seq >> processor >> function by storing all of the results in a slice as Step is called, >> then running your processor function on the slice of values ( >> slices.Values(storedVals)) at the end when Value is called? >> >> If not, is the idea that you want to avoid intermediate storage and have >> the processor function aggregating values as you go along? If so, perhaps >> you could substitute a chan for the slice and send the processor off to do >> its thing in a goroutine. In that case, you would only need a utility >> function to adapt a channel to a iter.Seq. >> >> On Tue, Feb 25, 2025 at 9:17 AM Nuno Cruces <ncr...@gmail.com> wrote: >> >>> Hi Jason, >>> >>> First of all, thank you for your interest. >>> >>> On Tue, 25 Feb 2025 at 12:28, Jason E. Aten <j.e....@gmail.com> wrote: >>> >>>> >>>> The problem specification you originally gave was this: >>>> >>>> > "to implement... a "processor" function that accepts an iter.Seq and >>>> returns a result: >>>> > func processor(seq iter.Seq[value]) result { ... }" >>>> >>> >>> If you're going to omit important parts of the problem specification >>> (the bit after implement), we'll get nowhere. >>> >>> I don't want "to implement... a processor", I want to implement this >>> interface >>> <https://pkg.go.dev/github.com/ncruces/go-sqlite3#AggregateFunction> >>> *with* a processor. >>> To make it easier, here's the interface (which was linked to in the >>> original message): >>> >>> type AggregateFunction interface { >>> // Step is invoked to add a row to the current window. >>> // The function arguments, if any, corresponding to the row being >>> added, are passed to Step. >>> // Implementations must not retain arg. >>> Step(ctx Context, arg ...Value) >>> >>> >>> // Value is invoked to return the current (or final) value of the >>> aggregate. >>> Value(ctx Context) >>> } >>> >>> >>> The first thing you should note is that the consumer of this interface >>> calls me, not the other way around. >>> It calls me to give me the sequence of values (in Step), and to stop >>> iteration and get a result (in Value). >>> >>> Now assume I have a preexisting function that calculates the sum of an >>> iter.Seq[float64], like so: >>> >>> func sum(seq iter.Seq[float64]) float64 { >>> count := 0 >>> total := 0.0 >>> for arg := range seq { >>> total += arg >>> count++ >>> } >>> return total / float64(count) >>> } >>> >>> >>> I want to use this function to "easily" implement an AggregateFunction. >>> >>> As for why the AggregateFunction is what it is, it's the interface to >>> register an SQLite aggregate function. >>> And, obviously, sum is not the function I really want to call. But >>> maybe there's a library that provides a nice and complex function that I >>> want to reuse, and which uses what, going forward, is supposed to be the >>> "standard" Go iteration interface. >>> >>> Moreover, you originally said, >>>> "I've been cracking my head, and I'm convinced I can't with just >>>> iter.Seq/iter.Pull" >>>> >>> >>> I wanted a solution that doesn't necessarily involve goroutines and >>> channels, for the same reason that iter.Pull was created: because >>> goroutines and channels add unnecessary parallelism that has the potential >>> to introduce data races, when what is needed is concurrency *without* >>> <https://research.swtch.com/coro#thread>parallelism >>> <https://research.swtch.com/coro#thread>. >>> >>> >>>> Which says, in contradiction to your later comment/claim, that you did >>>> not have a solution >>>> when you asked for one. >>>> >>> >>> I have a solution, to which I linked in my original email. >>> The solution I found *can* be expressed without goroutines and >>> channels, but it can also be *better* expressed with newcoro / >>> coroswitch. >>> I *can't* express it without either. >>> >>> >>>> The code I provided demonstrates that a solution to the stated problem >>>> is indeed possible >>>> with iter.Seq. >>>> >>> >>> If you can show me how to use your code to implement AggregateFunction, >>> you will have solved my problem as stated. >>> Otherwise, I'm afraid it falls short. >>> >>> Kind regards, >>> Nuno >>> >>> -- >>> >> You received this message because you are subscribed to the Google Groups >>> "golang-nuts" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to golang-nuts...@googlegroups.com. >>> >> To view this discussion visit >>> https://groups.google.com/d/msgid/golang-nuts/CAM0U__8k6%2BEiKZO%3DUuxXkVjVh8w6y%2BG9Zc0j2eH3GU02XiJ_iQ%40mail.gmail.com >>> >>> <https://groups.google.com/d/msgid/golang-nuts/CAM0U__8k6%2BEiKZO%3DUuxXkVjVh8w6y%2BG9Zc0j2eH3GU02XiJ_iQ%40mail.gmail.com?utm_medium=email&utm_source=footer> >>> . >>> >> -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion visit https://groups.google.com/d/msgid/golang-nuts/b048ced1-f337-4f19-b734-4f5ef433cdd2n%40googlegroups.com.