As I mentioned earlier, I wanted to see if I could implement a waitgroup with channels instead of the stdlib's sync.Atomic counters, and using a special type of concurrent datatype called a PN Converged Replicated Datatype. Well, I'm not sure if this implementation precisely implements this type of CRDT, but it does work, and I wanted to share it. Note that play doesn't like these long running (?) examples, so here it is verbatim as I just finished writing it:
package chanwg import "fmt" type WaitGroup struct { workers uint ops chan func() ready chan struct{} done chan struct{} } func New() *WaitGroup { wg := &WaitGroup{ ops: make(chan func()), done: make(chan struct{}), ready: make(chan struct{}), } go func() { // wait loop doesn't start until something is put into thte done := false for !done { select { case fn := <-wg.ops: println("received op") fn() fmt.Println("num workers:", wg.WorkerCount()) // if !(wg.workers < 1) { // println("wait counter at zero") // done = true // close(wg.done) // } default: } } }() return wg } // Add adds a non-negative number func (wg *WaitGroup) Add(delta int) { if delta < 0 { return } fmt.Println("adding", delta, "workers") wg.ops <- func() { wg.workers += uint(delta) } } // Done subtracts a non-negative value from the workers count func (wg *WaitGroup) Done(delta int) { println("worker finished") if delta < 0 { return } println("pushing op to channel") wg.ops <- func() { println("finishing") wg.workers -= uint(delta) } // println("op should have cleared by now") } // Wait blocks until the waitgroup decrements to zero func (wg *WaitGroup) Wait() { println("a worker is waiting") <-wg.done println("job done") } func (wg *WaitGroup) WorkerCount() int { return int(wg.workers) } There could be some bug lurking in there, I'm not sure, but it runs exactly as I want it to, and all the debug prints show you how it works. Possibly one does not need to use channels containing functions that mutate the counter, but rather maybe they can be just directly increment/decremented within a select statement. I've gotten really used to using generator functions and they seem to be extremely easy to use and so greatly simplify and modularise my code that I am now tackling far more complex (if cyclomatic complexity is a measure - over 130 paths in a menu system I wrote that uses generators to parse a declaration of data types that also uses generators). I suppose the thing is it wouldn't be hard to extend the types of operations that you push to the ops channel, I can't think off the top of my head exactly any reasonable use case for some other operation though. One thing that does come to mind, however, is that a more complex, conditional increment operation could be written and execute based on other channel signals or the state of some other data, but I can't see any real use for that. I should create a benchmark that tests the relative performance of this versus sync.Atomic add/subtract operations. I think also that as I mentioned, changing the ops channel to just contain deltas on the group size might be a little bit faster than the conditional jumps a closure requires to enter and exit. So the jury is out still if this is in any way superior to sync.WaitGroup, but because I know that this library does not use channels that it almost certainly has a little higher overhead due to the function call context switches hidden inside the Atomic increment/decrement operations. Because all of those ops occur within the one supervisor waitgroup goroutine only, they are serialised automatically by the channel buffer (or the wait sync as sender and receiver both become ready), and no atomic/locking operations are required to prevent a race. I enabled race detector on a test of this code just now. The WorkerCount() function is racy. I think I need to change it so there is a channel underlying the retrieval implementation, it then would send the (empty) query to the query channel, and listen on an answer channel (maybe make them one-direction) to get the value without an explicit race. Yes, and this is probably why sync.WaitGroup has no way to inspect the current wait count also. I will see if I can make that function not racy. On Thursday, 2 May 2019 23:29:35 UTC+2, Øyvind Teig wrote: > > Thanks for the reference to Dave Cheney's blog note! And for this thread, > quite interesting to read. I am not used to explicitly closing channels at > all (occam (in the ninetees) and XC (now)), but I have sat through several > presentations on conferences seen the theme being discussed, like with the > JCSP library. I am impressed by the depth of the reasoning done by the Go > designers! -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.