Hi all,

I just posted this to SO and would appreciate some advice!!

http://stackoverflow.com/questions/41765198/deadlock-with-buffered-channel

My first attempt (coming from other languages) was that all the messages 
from addMessages went into some huge global map with locks.  This ended in 
tears, CPU and excessive locking.  Another SO post pointed me towards the 
approach I have currently taken with channels.  It works!  It works really 
well and CPU is down a lot and thinking about it the structure makes sense 
for the data being ingested.

Trouble is that it hangs from time to time, I suspect (although would love 
to get proof easily) that it is the "channel <- msg" that it is hanging on. 
 My thoughts are that the closure comes in but as the select read can be 
out of order an new packet comes in and we try to write to a nil 
channel, which obviously then blocks!  

Bearing in mind this sort of parallelism seems to work for the amount of 
data coming through is there a way to make this work?

Thanks in advance!

Lee

var MessageQueue = make(chan *trackingPacket_v1, 5000)

func init() {
    go jobDispatcher(MessageQueue)}

func addMessage(trackingPacket *trackingPacket_v1) {
    // Send the packet to the buffered queue!
    log.Println("Channel length:", len(MessageQueue))
    MessageQueue <- trackingPacket}

func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) {
    var channelMap = make(map[string]chan *trackingPacket_v1)

    // Channel that listens for the strings that want to exit
    shutdownChan := make(chan string)

    for {
        select {
        case msg := <-inboundFromTCP:
            log.Println("Got packet", msg.Avr)
            channel, ok := channelMap[msg.Avr]
            if !ok {
                packetChan := make(chan *trackingPacket_v1)

                channelMap[msg.Avr] = packetChan
                go processPackets(packetChan, shutdownChan, msg.Avr)
                packetChan <- msg
                continue
            }
            channel <- msg
        case shutdownString := <-shutdownChan:
            log.Println("Shutting down:", shutdownString)
            channel, ok := channelMap[shutdownString]
            if ok {
                delete(channelMap, shutdownString)
                close(channel)
            }
        }
    }}

func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id 
string) {
    var messages = []*trackingPacket_v1{}

    tickChan := time.NewTicker(time.Second * 1)
    defer tickChan.Stop()

    hasCheckedData := false

    for {
        select {
        case msg := <-ch:
            log.Println("Got a messages for", id)
            messages = append(messages, msg)
            hasCheckedData = false
        case <-tickChan.C:

            messages = cullChanMessages(messages)
            if len(messages) == 0 {
                messages = nil
                shutdown <- id
                return
            }

            // No point running checking when packets have not changed!!
            if hasCheckedData == false {
                processMLATCandidatesFromChan(messages)
                hasCheckedData = true
            }
        case <-time.After(time.Duration(time.Second * 60)):
            log.Println("This channel has been around for 60 seconds which is 
too much, kill it")
            messages = nil
            shutdown <- id
            return
        }
    }}



-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to