For aggregating results, an alternative to blocking on outstanding payloads 
is blocking on outstanding workers. You might find this talk interesting, 
and it has a pretty clever solution for "outstanding workers": 
https://about.sourcegraph.com/blog/go/gophercon-2018-rethinking-classical-concurrency-patterns.
 
Also, the part about eliminating idle workers I think is a motif that shows 
up in nice solutions to other problem or elaborations on this problem.

There's an x/ package along these lines: 
https://pkg.go.dev/golang.org/x/sync/semaphore 
<https://pkg.go.dev/golang.org/x/sync/semaphore#example-package-WorkerPool>
(also worth mentioning: https://pkg.go.dev/golang.org/x/sync/errgroup 
<https://pkg.go.dev/golang.org/x/sync/errgroup#Group.Wait>)
On Saturday, May 20, 2023 at 1:23:24 AM UTC-7 Tobias Klausmann wrote:

> Hi! 
>
> I find myself often writing tools that follow the pattern in the
> subject, that is:
>
> - Gather some input to process, e.g. a list of filenames from the
> command line. If this can be streamed (processing starting before
> knowing all inputs) even better, but not strictly necessary.
> - Start N workers that process several of the inputs. They receive them
> through an appropriate channel, e.g. `chan string` for filenames. The
> results of the processing are put into another channel, typically some
> custom struct. When the input channel is closed the worker exits.
> - After starting all the workers, the main goroutine collects the
> results from the results channel (and accumulates them in a slice if
> they need to be sorted, or just prints them as needed etc).
>
> As code (I've omitted a few things for brevity):
>
> func process(filenames []string) {
> results := make([]imageInfo, 0, len(filenames))
> wc := make(chan string) // work
> rc := make(chan imageInfo) // results
> numprocs := maxParallelism()
> for i := 0; i <= numprocs; i++ {
> go worker(wc, rc)
> }
> go func(fns []string) {
> for _, fn := range fns {
> wc <- fn
> }
> close(wc)
> }(filenames)
> for i := 0; i < len(filenames); i++ {
> results = append(results, <-rc)
> }
> sort.Slice(results, func(x, y int) bool { return results[x].name < 
> results[y].name })
> for _, r := range results {
> fmt.Printf("%s %d %d %.2f %t %t\n", r.name, r.width, r.height)
> }
> }
>
> func worker(wc chan string, rc chan imageInfo) {
> for fn := range wc {
> // mustLoadImage just wraps functions from image/png etc
> rc <- mustLoadImage(fn)
> }
> }
>
> What I wonder about is the collection stage: the assumption that there
> will be exactly as many results as inputs seems brittle, and the only
> real way to handle errors is to put them into the result struct and
> examining them later.
>
> I also usually prefer handling workers with a `waitgroup`, but that only
> really works if there is no return channel to handle, either because
> the workers do the output/final processing, or because results are
> written to a global. The former is not always possible, and the latter
> is ugly.
>
> There is also the option of just making the result channel big enough
> to hold all results, but that seems a bit ugly as well.
>
> So what's the most idiomatic way of doing this?
>
> Best,
> Tobias
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/d90442ca-0dc1-4a8b-b53d-8be2edd6d3f7n%40googlegroups.com.

Reply via email to