Aside from style issues of mixing channel input/output with program logic,
and hiding the useful return value of go-loop, the real problem here is
doing your work inside a go block. Go blocks are not meant for blocking
tasks, whether CPU or IO bound; doing real work inside go blocks risks
starving all of core.async's implementation threads which prevents all your
go blocks from doing work. Use async/thread to do work in a real thread and
park on the channel it returns.

On Tue, Jun 20, 2017 at 10:15 AM Tom Connors <t.v.conn...@gmail.com> wrote:

> Thanks for the suggestion, Didier, but I was unable to find a way to make
> pmap work for my use case. For those interested, here's what I came up
> with, then some  questions:
>
> (defn parallel-per
>   "Handle records from input-chan in parallel, but records with matching
> `splitter` return values serially."
>   [splitter handler input-chan]
>   (let [blockers (atom {}) ;; map of group-key to [chan num-remaining]
>         status-chan (async/chan)]
>     (async/go-loop []
>       (let [[val port] (async/alts! [input-chan status-chan])]
>         (if (= port input-chan)
>           (if (some? val)
>             (let [group-key (splitter val)]
>               (if-let [blocker (get @blockers group-key)]
>                 (let [[blocker-chan ^long num-remaining] blocker
>                       next-blocker-chan (async/chan)]
>                   (swap! blockers assoc group-key [next-blocker-chan (inc
> num-remaining)])
>                   (async/go
>                     (async/<! blocker-chan)
>                     (handler val)
>                     (async/put! status-chan group-key)
>                     (async/close! next-blocker-chan))
>                   (recur))
>                 (let [blocker-chan (async/chan)]
>                   (swap! blockers assoc group-key [blocker-chan 1])
>                   (async/go
>                     (handler val)
>                     (async/put! status-chan group-key)
>                     (async/close! blocker-chan))
>                   (recur))))
>             (async/close! status-chan))
>           (let [group-key val
>                 [_ ^long num-remaining] (get @blockers group-key)]
>             (if (> num-remaining 1)
>               (do
>                 (swap! blockers update-in [group-key 1] dec)
>                 (recur))
>               (do
>                 (swap! blockers dissoc group-key)
>                 (recur)))))))
>     nil))
>
> Does anything in here look bad? Is there a way to gracefully handle
> input-chan closing without using loop/recur? I'm using a new channel for
> every new record to block the next record with the same `splitter` return
> value - my first approach used one channel per distinct `splitter` value,
> but I saw some results printed out of order. Does this mean that the order
> of takes from <! is not deterministic, or that I have/had a bug?
>
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with
> your first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to the Google Groups
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to clojure+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to