I agree with Aston's insight.  Step could be just buffering.

Re "the "standard" Go iteration interface." <- this is not.a great API to 
offer clients, for the reasons you observe. Hence I don't think it should 
be considered as an API standard, or much of a standard at all, as such. 
Its better as an implementation detail.

And your larger problem stems from the same source of pain: trying to 
integrate iter.Seq across API boundaries. Its not really good for that--it 
will always create an extremely tight coupling between producer and 
consumer. In essence they are sharing a thread and jumping between each 
other's stacks.

In fact, if you leave the iter clutter aside, then your problem becomes 
trivial -- just reuse the guts of the sum/processor function but leave off 
the problematic seq iterface. 

That said, if you really have to use a sub-routine that insists on an 
iter.Seq interface, then
I conclude that goroutines are unavoidable, because the iter protocol is a 
blocking protocol.

Here's the simplest thing otherwise; I did it from spec -- it might be 
close to your solution-- but it omits the confusing wait channel. If it 
gives you ideas, great. Perhaps it contributes to your study in that I 
could not see any way to 
do it without a goroutine. That means channels too.

package main
import "iter"
// the pre-existing "processor"
func consumeAndSum(it iter.Seq[int]) (sum int) {
for i := range it {
println("consumer sees ", i)
sum += i
}
println("consumer done.")
return sum
}
// the shim between clients and processor, that implements Summer.
type UseProcessor struct {
initDone bool
it       iter.Seq[int]
feedme   chan int
result   chan int
done     chan struct{}
}
func (m *UseProcessor) AddObs(obs int) {
if !m.initDone {
m.initDone = true

m.done = make(chan struct{})
m.result = make(chan int)

// To avoid a shutdown race,
// we keep feedme un-buffered; we
// special case yeilding the firstObs
// to acheive this.
m.feedme = make(chan int)
firstObs := obs

// define an iterator to feed the processor.
// capture the channels (feedme,done) in the closure.
m.it = iter.Seq[int](func(yield func(int) bool) {
if !yield(firstObs) {
return
}
for {
select {
case newObs := <-m.feedme:
if !yield(newObs) {
// processor wants to halt early
return
}
case <-m.done:
// client wants to halt.
// by exiting, we tell the
// processor to compute its final
// result sum.
return
}
}
})
// start the processor with the iterator
               // the iterator protocol forces this to be in a goroutine 
since cosumeAndSum will block.
go func() {
m.result <- consumeAndSum(m.it)
}()
return
}
m.feedme <- obs
}
func (m *UseProcessor) Sum() int {
close(m.done)
return <-m.result
}
type Summer interface {
AddObs(obs int)
Sum() int
}
func main() {
var summer Summer = &UseProcessor{}
for i := range 4 {
summer.AddObs(i)
}
println("all done. sum = ", summer.Sum())
}

On Tuesday, February 25, 2025 at 2:53:53 PM UTC Aston Motes wrote:

> Nuno, can you say more about the constraints here? It seems you could you 
> accomplish your goal of creating an AggregateFunction out of an iter.Seq 
> processor 
> function by storing all of the results in a slice as Step is called, then 
> running your processor function on the slice of values (
> slices.Values(storedVals)) at the end when Value is called?
>
> If not, is the idea that you want to avoid intermediate storage and have 
> the processor function aggregating values as you go along? If so, perhaps 
> you could substitute a chan for the slice and send the processor off to do 
> its thing in a goroutine. In that case, you would only need a utility 
> function to adapt a channel to a iter.Seq.
>
> On Tue, Feb 25, 2025 at 9:17 AM Nuno Cruces <ncr...@gmail.com> wrote:
>
>> Hi Jason,
>>
>> First of all, thank you for your interest.
>>
>> On Tue, 25 Feb 2025 at 12:28, Jason E. Aten <j.e....@gmail.com> wrote:
>>
>>>
>>> The problem specification you originally gave was this:
>>>
>>> > "to implement... a "processor" function that accepts an iter.Seq and 
>>> returns a result:
>>> >    func processor(seq iter.Seq[value]) result { ... }" 
>>>
>>  
>> If you're going to omit important parts of the problem specification (the 
>> bit after implement), we'll get nowhere.
>>
>> I don't want "to implement... a processor", I want to implement this 
>> interface 
>> <https://pkg.go.dev/github.com/ncruces/go-sqlite3#AggregateFunction> 
>> *with* a processor.
>> To make it easier, here's the interface (which was linked to in the 
>> original message):
>>
>> type AggregateFunction interface {
>>   // Step is invoked to add a row to the current window.
>>   // The function arguments, if any, corresponding to the row being 
>> added, are passed to Step.
>>   // Implementations must not retain arg.
>>   Step(ctx Context, arg ...Value)
>>
>>
>>   // Value is invoked to return the current (or final) value of the 
>> aggregate.
>>   Value(ctx Context)
>> }
>>
>>
>> The first thing you should note is that the consumer of this interface 
>> calls me, not the other way around.
>> It calls me to give me the sequence of values (in Step), and to stop 
>> iteration and get a result (in Value).
>>  
>> Now assume I have a preexisting function that calculates the sum of an 
>> iter.Seq[float64], like so:
>>
>> func sum(seq iter.Seq[float64]) float64 {
>>   count := 0
>>   total := 0.0
>>   for arg := range seq {
>>     total += arg
>>     count++
>>   }
>>   return total / float64(count)
>> }
>>
>>
>> I want to use this function to "easily" implement an AggregateFunction.
>>
>> As for why the AggregateFunction is what it is, it's the interface to 
>> register an SQLite aggregate function.
>> And, obviously, sum is not the function I really want to call. But maybe 
>> there's a library that provides a nice and complex function that I want to 
>> reuse, and which uses what, going forward, is supposed to be the "standard" 
>> Go iteration interface.
>>
>> Moreover, you originally said,
>>> "I've been cracking my head, and I'm convinced I can't with just 
>>> iter.Seq/iter.Pull"
>>>
>>
>> I wanted a solution that doesn't necessarily involve goroutines and 
>> channels, for the same reason that iter.Pull was created: because 
>> goroutines and channels add unnecessary parallelism that has the potential 
>> to introduce data races, when what is needed is concurrency *without* 
>> <https://research.swtch.com/coro#thread>parallelism 
>> <https://research.swtch.com/coro#thread>.
>>  
>>
>>> Which says, in contradiction to your later comment/claim, that you did 
>>> not have a solution
>>> when you asked for one.
>>>
>>
>> I have a solution, to which I linked in my original email.
>> The solution I found *can* be expressed without goroutines and channels, 
>> but it can also be *better* expressed with newcoro / coroswitch.
>> I *can't* express it without either.
>>  
>>
>>> The code I provided demonstrates that a solution to the stated problem 
>>> is indeed possible
>>> with iter.Seq.
>>>
>>
>> If you can show me how to use your code to implement AggregateFunction, 
>> you will have solved my problem as stated.
>> Otherwise, I'm afraid it falls short.
>>
>> Kind regards,
>> Nuno
>>
>> -- 
>>
> You received this message because you are subscribed to the Google Groups 
>> "golang-nuts" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to golang-nuts...@googlegroups.com.
>>
> To view this discussion visit 
>> https://groups.google.com/d/msgid/golang-nuts/CAM0U__8k6%2BEiKZO%3DUuxXkVjVh8w6y%2BG9Zc0j2eH3GU02XiJ_iQ%40mail.gmail.com
>>  
>> <https://groups.google.com/d/msgid/golang-nuts/CAM0U__8k6%2BEiKZO%3DUuxXkVjVh8w6y%2BG9Zc0j2eH3GU02XiJ_iQ%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion visit 
https://groups.google.com/d/msgid/golang-nuts/21b5764d-36de-49fd-ac01-91e53ffadd68n%40googlegroups.com.

Reply via email to