It's not precisely the general functionality that I will implement for my transport, but here is a simple example of a classifier type processing queue:
https://play.golang.org/p/ytdrXgCdbQH This processes a series of sequential integers and pops them into an array to find the highest factor of a given range of numbers. The code I will write soon is slightly different, as, obviously, that above there is not technically a queue. This code shows how to make a non-deadlocking processing queue, however. Adding an actual queue like for my intended purpose of bundling packets with a common uuid is not much further, instead of just dropping the integers into their position in the slice, it iterates them as each item is received to find a match, if it doesn't find enough, then it puts the item back at the end of the search on the queue and waits for the next new item to arrive. I'll be writing that shortly. For that, I think the simple example would use an RNG to generate numbers within the specified range, and then for the example, it will continue to accumulate numbers in the buffer until a recurrance occurs, then the numbers are appended to the array and this index is ignored when another one comes in later. That most closely models what I am building. On Thursday, 2 May 2019 13:26:47 UTC+2, Louki Sumirniy wrote: > > Yeah, I was able to think a bit more about it as I was falling asleep > later and I realised how I meant it to run. I had to verify that indeed > channels are FIFO queues, as that was the basis of this way of using them. > > The receiver channel is unbuffered, and lives in one goroutine. When it > receives something it bounces it into the queue and for/range loops through > the content of a fairly big-buffered working channel where items can > collect while they are fresh, and upon arrival of a new item the new item > is checked for a match against the contents of the queue, as well as > kicking out stale data (and recording the uuid of the stale set so it can > be immediately dumped if any further packets got hung up and come after way > too long. > > This differs a lot from the loopy design I made in the OP. In this design > there is only to threads instead of three. I think the geometry of a > channel pattern is important - specifically, everything needs to be done in > pairs with channels, although maybe sometimes you want it too receive but > not need it to send it anywhere, just store/drop, as the algorithm requires. > > I still need to think through the design a bit more. Like, perhaps the > queue channel *should* be a pair of one-direction channels so one is the > main fifo and the other side each item is taken off the queue, processed, > and then put back into the stream. Ordering is not important, except that > it is very handy that it is a FIFO because this means if I have a buffer > with some number, and get a new item, put it into the buffer queue, and > then the queue unpacks the newest item last. I think I could make it like > this, actually: > > one channel inbound receiver, it passes into a buffered queue channel, and > triggers the passing out of buffered items from the head of the queue to > watcher 1, 2, 3, each watcher process being a separate process that may > swallow or redirect the contents. For each new UUID item that comes in, a > single thread could be started that keeps reading, checking and (re) > directing the input as it passes out of the buffer and through the > watchers. Something like this: > > input -> buffer[64] -> (watcher 1) -> (watcher 2) -> buffer [64] > > With this pattern I could have a new goroutine spawn for each new UUID > that marks out a batch, that springs a deadline tick and when the deadline > hits the watcher's buffer is cleared and the goroutine ends, implementing > expiry, and the UUID is attached to a simple buffered channel that keeps > the last 100 or so UUIDs and uses it to immediately identify stale junk > (presumably the main data type in the other channels is significantly > bigger data than the UUID integer - my intent is that the data type should > be a UDP packet so that means it is size restricted and contains something > arbitrary that watchers detect, decode and respond to. > > It's a work in progress, but I know from previous times writing code > dealing with simple batch/queue problems like this, that the Reader/Writer > pattern is most often used and requires a lot of slice fiddling implemented > using arrays/slices, but a buffered channel, being a FIFO, is a queue > buffer, so it can be used to store (relatively) ordered items that age as > they get to the head of the queue, and allow a check-pass on each item. > > These checkers can 'return' to the next in line so the checker-queue, so > to speak, also has to be stored in some form of state. This could be done > by having that first receiver channel check first the list of fresh UUIDs > firstly, which would be a map linking to the bundles that are made out of > them, and matches are pulled out of the buffer queue and attached to the > bundles, which are processed when they have the required minimum pieces or > the thread times out, adds the UUID to the stale list/queue, and so on. > > Attaching new watchers to the chain simply means changing the destination > of the last watcher in the queue from the return to the buffer, to the > input of the new watcher. When a watcher times out it signals its > predecessor with its successor's location and the stale item is deleted > from the watchers list. > > It's going to be done... I probably need to look at some transport > implementations in Go for some other clues, this was just my idea of how to > build a minimum latency batching system for receiving error-redundancy UDP > packets encoded with reed solomon encoding, with the main end-goal being to > have delivery of the data with a minimum of delay. The design I just > sketched out allows for a lot of parallelisation with the 'watcher' > processes, but managing that chain of receivers is obviously going to be > some kind of overhead. > > After a little thought I think I know how to implement the multi-watcher > filter queue - when a watcher expires, and it's specific UUID is stale, it > sends that in a message to its upline, containing the expiring thread's > subsequent queue member (next in queue in direction of flow), which then > redirects its output to bypass the stale thread, which can then terminate > once it is no longer in the filter queue. > > On Thursday, 2 May 2019 10:30:28 UTC+2, Øyvind Teig wrote: >> >> Hi, Louki Sumirniy >> >> This is not really a response to your problem in particular, so it may >> totally miss your target. It's been a while since I did anything in this >> group. However, it's a response to the use of buffered channels. It's a >> coincidence that I react to your posting (and not the probably hundreds of >> others over the years where this comment may have been relevant). But I >> decided this morning to actually look into one of the group update mails, >> and there you were! >> >> In a transcript from [1] Rob Pike says that >> >> “Now for those experts in the room who know about buffered channels in Go >> – which exist – you can create a channel with a buffer. And buffered >> channels have the property that they don’t synchronise when you send, >> because you can just drop a value in the buffer and keep going. So they >> have different properties. And they’re kind of subtle. They’re very useful >> for certain problems, but you don’t need them. And we’re not going to use >> them at all in our examples today, because I don’t want to complicate life >> by explaining them.” >> >> I don't know if that statement is still valid, and I would not know >> whether your example is indeed one of the "certain problems" where you have >> got the correct usage. In that case, my comments below would be of less >> value in this concrete situation. Also, whether there is a more generic >> library in Go now that may help getting rid of buffered channels. Maybe >> even an output into a zero-buffered channel in a select with a timeout will >> do. >> >> If you fill up a channel with data you would often need some state to >> know about what is in the channel. If it's a safety critical implementation >> you may not want to just drop the data into the channel and forget. If you >> need to restart the comms in some way you would need to flush the channel, >> without easily knowing what you are flushing. The message "fire in 1 second >> if not cancelled" comes through but you would not know that the "cancel!" >> message was what you had to flush in the channel. In any case, a full >> channel would be blocking anyhow - so you would have to take care of that. >> Or alternatively _know_ that the consumer always stays ahead of the >> buffered channel, which may be hard to know. >> >> I guess there are several (more complex?) concurrency patterns available >> that may be used instead of the (simple?) buffered channel: >> >> All of the patterns below would use synchronised rendezvous with >> zero-buffered channels that would let a server goroutine (task, process) >> never have to block to get rid of its data. After all, that's why one would >> use a buffered channel; so that one would not need to block. All of the >> below patterns move data, but I guess there may be patterns for moving >> access as well (like mobile channels). All would also be deadlock free. >> >> The Overflow Buffer pattern uses a composite goroutine consisting of two >> inner goroutines. One Input that always accepts data and one Output that >> blocks to output, and in between there is a channel with Data one direction >> that never blocks and a channel with Data-sent back. If the Input has not >> got the Data-sent back then there is an overflow that may be handled by >> user code. See [2], figure 3. >> >> Then there are the Knock-Come pattern [3] and a pattern like the XCHAN >> [4]. In the latter's appendix a Go solution is discussed. >> >> - - - >> >> [1] Rob Pike: "Go concurrency patterns": >> https://www.youtube.com/watch?v=f6kdp27TYZs&sns=em at Google I/O 2012. >> Discussed in [5] >> >> Disclaimer: there are no ads, no gifts, no incoming anything with my blog >> notes, just fun and expenses: >> >> [2] http://www.teigfam.net/oyvind/pub/pub_details.html#NoBlocking - See >> Figure 3 >> >> [3] >> https://oyvteig.blogspot.com/2009/03/009-knock-come-deadlock-free-pattern.html >> >> Knock-come >> >> [4] http://www.teigfam.net/oyvind/pub/pub_details.html#XCHAN - XCHANs: >> Notes on a New Channel Type >> >> [5] >> http://www.teigfam.net/oyvind/home/technology/072-pike-sutter-concurrency-vs-concurrency/ >> > > -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.