For aggregating results, an alternative to blocking on outstanding payloads is blocking on outstanding workers. You might find this talk interesting, and it has a pretty clever solution for "outstanding workers": https://about.sourcegraph.com/blog/go/gophercon-2018-rethinking-classical-concurrency-patterns. Also, the part about eliminating idle workers I think is a motif that shows up in nice solutions to other problem or elaborations on this problem.
There's an x/ package along these lines: https://pkg.go.dev/golang.org/x/sync/semaphore <https://pkg.go.dev/golang.org/x/sync/semaphore#example-package-WorkerPool> (also worth mentioning: https://pkg.go.dev/golang.org/x/sync/errgroup <https://pkg.go.dev/golang.org/x/sync/errgroup#Group.Wait>) On Saturday, May 20, 2023 at 1:23:24 AM UTC-7 Tobias Klausmann wrote: > Hi! > > I find myself often writing tools that follow the pattern in the > subject, that is: > > - Gather some input to process, e.g. a list of filenames from the > command line. If this can be streamed (processing starting before > knowing all inputs) even better, but not strictly necessary. > - Start N workers that process several of the inputs. They receive them > through an appropriate channel, e.g. `chan string` for filenames. The > results of the processing are put into another channel, typically some > custom struct. When the input channel is closed the worker exits. > - After starting all the workers, the main goroutine collects the > results from the results channel (and accumulates them in a slice if > they need to be sorted, or just prints them as needed etc). > > As code (I've omitted a few things for brevity): > > func process(filenames []string) { > results := make([]imageInfo, 0, len(filenames)) > wc := make(chan string) // work > rc := make(chan imageInfo) // results > numprocs := maxParallelism() > for i := 0; i <= numprocs; i++ { > go worker(wc, rc) > } > go func(fns []string) { > for _, fn := range fns { > wc <- fn > } > close(wc) > }(filenames) > for i := 0; i < len(filenames); i++ { > results = append(results, <-rc) > } > sort.Slice(results, func(x, y int) bool { return results[x].name < > results[y].name }) > for _, r := range results { > fmt.Printf("%s %d %d %.2f %t %t\n", r.name, r.width, r.height) > } > } > > func worker(wc chan string, rc chan imageInfo) { > for fn := range wc { > // mustLoadImage just wraps functions from image/png etc > rc <- mustLoadImage(fn) > } > } > > What I wonder about is the collection stage: the assumption that there > will be exactly as many results as inputs seems brittle, and the only > real way to handle errors is to put them into the result struct and > examining them later. > > I also usually prefer handling workers with a `waitgroup`, but that only > really works if there is no return channel to handle, either because > the workers do the output/final processing, or because results are > written to a global. The former is not always possible, and the latter > is ugly. > > There is also the option of just making the result channel big enough > to hold all results, but that seems a bit ugly as well. > > So what's the most idiomatic way of doing this? > > Best, > Tobias > -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/d90442ca-0dc1-4a8b-b53d-8be2edd6d3f7n%40googlegroups.com.