As I mentioned earlier, I wanted to see if I could implement a waitgroup 
with channels instead of the stdlib's sync.Atomic counters, and using a 
special type of concurrent datatype called a PN Converged Replicated 
Datatype. Well, I'm not sure if this implementation precisely implements 
this type of CRDT, but it does work, and I wanted to share it. Note that 
play doesn't like these long running (?) examples, so here it is verbatim 
as I just finished writing it:

package chanwg

import "fmt"

type WaitGroup struct {
    workers uint
    ops chan func()
    ready chan struct{}
    done chan struct{}
}

func New() *WaitGroup {
    wg := &WaitGroup{
        ops: make(chan func()),
        done: make(chan struct{}),
        ready: make(chan struct{}),
    }
    go func() {
        // wait loop doesn't start until something is put into thte
        done := false
        for !done {
            select {
            case fn := <-wg.ops:
                println("received op")
                fn()
                fmt.Println("num workers:", wg.WorkerCount())
                // if !(wg.workers < 1) {
                //  println("wait counter at zero")
                //  done = true
                //  close(wg.done)
                // }
            default:
            }
        }

    }()
    return wg
}

// Add adds a non-negative number
func (wg *WaitGroup) Add(delta int) {
    if delta < 0 {
        return
    }
    fmt.Println("adding", delta, "workers")
    wg.ops <- func() {
        wg.workers += uint(delta)
    }
}

// Done subtracts a non-negative value from the workers count
func (wg *WaitGroup) Done(delta int) {
    println("worker finished")
    if delta < 0 {
        return
    }
    println("pushing op to channel")
    wg.ops <- func() {
        println("finishing")
        wg.workers -= uint(delta)
    }
    // println("op should have cleared by now")
}

// Wait blocks until the waitgroup decrements to zero
func (wg *WaitGroup) Wait() {
    println("a worker is waiting")
    <-wg.done
    println("job done")
}

func (wg *WaitGroup) WorkerCount() int {
    return int(wg.workers)
}


There could be some bug lurking in there, I'm not sure, but it runs exactly 
as I want it to, and all the debug prints show you how it works.

Possibly one does not need to use channels containing functions that mutate 
the counter, but rather maybe they can be just directly 
increment/decremented within a select statement. I've gotten really used to 
using generator functions and they seem to be extremely easy to use and so 
greatly simplify and modularise my code that I am now tackling far more 
complex (if cyclomatic complexity is a measure - over 130 paths in a menu 
system I wrote that uses generators to parse a declaration of data types 
that also uses generators).

I suppose the thing is it wouldn't be hard to extend the types of 
operations that you push to the ops  channel, I can't think off the top of 
my head exactly any reasonable use case for some other operation though. 
One thing that does come to mind, however, is that a more complex, 
conditional increment operation could be written and execute based on other 
channel signals or the state of some other data, but I can't see any real 
use for that.

I should create a benchmark that tests the relative performance of this 
versus sync.Atomic add/subtract operations. I think also that as I 
mentioned, changing the ops channel to just contain deltas on the group 
size might be a little bit faster than the conditional jumps a closure 
requires to enter and exit.

So the jury is out still if this is in any way superior to sync.WaitGroup, 
but because I know that this library does not use channels that it almost 
certainly has a little higher overhead due to the function call context 
switches hidden inside the Atomic increment/decrement operations.

Because all of those ops occur within the one supervisor waitgroup 
goroutine only, they are serialised automatically by the channel buffer (or 
the wait sync as sender and receiver both become ready), and no 
atomic/locking operations are required to prevent a race.

I enabled race detector on a test of this code just now. The WorkerCount() 
function is racy. I think I need to change it so there is a channel 
underlying the retrieval implementation, it then would send the (empty) 
query to the query channel, and listen on an answer channel (maybe make 
them one-direction) to get the value without an explicit race.

Yes, and this is probably why sync.WaitGroup has no way to inspect the 
current wait count also. I will see if I can make that function not racy.

On Thursday, 2 May 2019 23:29:35 UTC+2, Øyvind Teig wrote:
>
> Thanks for the reference to Dave Cheney's blog note! And for this thread, 
> quite interesting to read. I am not used to explicitly closing channels at 
> all (occam (in the ninetees) and XC (now)), but I have sat through several 
> presentations on conferences seen the theme being discussed, like with the 
> JCSP library. I am impressed by the depth of the reasoning done by the Go 
> designers!

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to