Hi all, I just posted this to SO and would appreciate some advice!!
http://stackoverflow.com/questions/41765198/deadlock-with-buffered-channel My first attempt (coming from other languages) was that all the messages from addMessages went into some huge global map with locks. This ended in tears, CPU and excessive locking. Another SO post pointed me towards the approach I have currently taken with channels. It works! It works really well and CPU is down a lot and thinking about it the structure makes sense for the data being ingested. Trouble is that it hangs from time to time, I suspect (although would love to get proof easily) that it is the "channel <- msg" that it is hanging on. My thoughts are that the closure comes in but as the select read can be out of order an new packet comes in and we try to write to a nil channel, which obviously then blocks! Bearing in mind this sort of parallelism seems to work for the amount of data coming through is there a way to make this work? Thanks in advance! Lee var MessageQueue = make(chan *trackingPacket_v1, 5000) func init() { go jobDispatcher(MessageQueue)} func addMessage(trackingPacket *trackingPacket_v1) { // Send the packet to the buffered queue! log.Println("Channel length:", len(MessageQueue)) MessageQueue <- trackingPacket} func jobDispatcher(inboundFromTCP chan *trackingPacket_v1) { var channelMap = make(map[string]chan *trackingPacket_v1) // Channel that listens for the strings that want to exit shutdownChan := make(chan string) for { select { case msg := <-inboundFromTCP: log.Println("Got packet", msg.Avr) channel, ok := channelMap[msg.Avr] if !ok { packetChan := make(chan *trackingPacket_v1) channelMap[msg.Avr] = packetChan go processPackets(packetChan, shutdownChan, msg.Avr) packetChan <- msg continue } channel <- msg case shutdownString := <-shutdownChan: log.Println("Shutting down:", shutdownString) channel, ok := channelMap[shutdownString] if ok { delete(channelMap, shutdownString) close(channel) } } }} func processPackets(ch chan *trackingPacket_v1, shutdown chan string, id string) { var messages = []*trackingPacket_v1{} tickChan := time.NewTicker(time.Second * 1) defer tickChan.Stop() hasCheckedData := false for { select { case msg := <-ch: log.Println("Got a messages for", id) messages = append(messages, msg) hasCheckedData = false case <-tickChan.C: messages = cullChanMessages(messages) if len(messages) == 0 { messages = nil shutdown <- id return } // No point running checking when packets have not changed!! if hasCheckedData == false { processMLATCandidatesFromChan(messages) hasCheckedData = true } case <-time.After(time.Duration(time.Second * 60)): log.Println("This channel has been around for 60 seconds which is too much, kill it") messages = nil shutdown <- id return } }} -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.