Aside from style issues of mixing channel input/output with program logic, and hiding the useful return value of go-loop, the real problem here is doing your work inside a go block. Go blocks are not meant for blocking tasks, whether CPU or IO bound; doing real work inside go blocks risks starving all of core.async's implementation threads which prevents all your go blocks from doing work. Use async/thread to do work in a real thread and park on the channel it returns.
On Tue, Jun 20, 2017 at 10:15 AM Tom Connors <t.v.conn...@gmail.com> wrote: > Thanks for the suggestion, Didier, but I was unable to find a way to make > pmap work for my use case. For those interested, here's what I came up > with, then some questions: > > (defn parallel-per > "Handle records from input-chan in parallel, but records with matching > `splitter` return values serially." > [splitter handler input-chan] > (let [blockers (atom {}) ;; map of group-key to [chan num-remaining] > status-chan (async/chan)] > (async/go-loop [] > (let [[val port] (async/alts! [input-chan status-chan])] > (if (= port input-chan) > (if (some? val) > (let [group-key (splitter val)] > (if-let [blocker (get @blockers group-key)] > (let [[blocker-chan ^long num-remaining] blocker > next-blocker-chan (async/chan)] > (swap! blockers assoc group-key [next-blocker-chan (inc > num-remaining)]) > (async/go > (async/<! blocker-chan) > (handler val) > (async/put! status-chan group-key) > (async/close! next-blocker-chan)) > (recur)) > (let [blocker-chan (async/chan)] > (swap! blockers assoc group-key [blocker-chan 1]) > (async/go > (handler val) > (async/put! status-chan group-key) > (async/close! blocker-chan)) > (recur)))) > (async/close! status-chan)) > (let [group-key val > [_ ^long num-remaining] (get @blockers group-key)] > (if (> num-remaining 1) > (do > (swap! blockers update-in [group-key 1] dec) > (recur)) > (do > (swap! blockers dissoc group-key) > (recur))))))) > nil)) > > Does anything in here look bad? Is there a way to gracefully handle > input-chan closing without using loop/recur? I'm using a new channel for > every new record to block the next record with the same `splitter` return > value - my first approach used one channel per distinct `splitter` value, > but I saw some results printed out of order. Does this mean that the order > of takes from <! is not deterministic, or that I have/had a bug? > > -- > You received this message because you are subscribed to the Google > Groups "Clojure" group. > To post to this group, send email to clojure@googlegroups.com > Note that posts from new members are moderated - please be patient with > your first post. > To unsubscribe from this group, send email to > clojure+unsubscr...@googlegroups.com > For more options, visit this group at > http://groups.google.com/group/clojure?hl=en > --- > You received this message because you are subscribed to the Google Groups > "Clojure" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to clojure+unsubscr...@googlegroups.com. > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.