Howdy Charles, For a static mix of sources there's async/merge [1], and it behaves the almost exactly way you describe, impl here [2].
Stepping back, the source chan-of-chans being closed represents value production being fully in flight, while the merged channel being closed represents consumption being finished, and are distinctly different events w.r.t. this particular process. Can you have the producer explicit for whoever is producing chans to signal explicitly, and for the consumer to consume the merge? If you want to terminate early upon the producer chan, listen to that explicit signal -- produced by some proc that has a more knowledge (<! done) I'm not sure that async/mix is the right abstraction for your stated needs because a mix doesn't terminate except when something closes the merged channel. A mix is 'alive' even when there is nothing to read or nothing left to read (which is btw the initial state of all mixes), and contradicts the stated goal. For your goal, just alts! on the input channel + the already read channels. (defn merge-from-chan [input] (let [out (chan)] (go-loop [reads [input]] (when (pos? (count reads)) (let [[v sc] (alts! reads)] (if (= input sc) (if (some? v) (recur (conj reads v)) ;; new source (recur (filterv #(not= input %) reads))) ;; input done, take it out of action (if (some? v) ;; value from elsewhere (do (>! output) (recur reads)) (recur (filterv #(not= sc %) reads)))))) out)) Hope this helps, [1] https://clojure.github.io/core.async/#clojure.core.async/merge [2] https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L931-L947 On Tuesday, April 22, 2014 5:35:24 PM UTC-4, Charles Duffy wrote: > > Howdy -- > > Trying to use core.async's mix facility, one difficulty I've run into is > the lack of a means to close the output channel only after al inputs have > been exhausted. In the interim, I've ended up using the below instead -- > which lacks some of the facilities provided by the mix interface (muting, > pausing, soloing, etc), but has the advantage of cleanly closing its output > channel if and only if all inputs (including the channel from which other > channels are read) have been flushed. > > That said, looking at core.async's code, it certainly seems possible to > extend the mix abstraction to handle this use case -- for instance, adding > a (close-on-empty! [mix]) call which would set a flag causing the the > output channel to close whenever all inputs are unmixed, whether by user > interaction or end-of-input, would be another solution. > > Thoughts? Should I publish the below (and related tooling -- such as an > N-way split helper used to create such channels of channels) as a separate > library? Submit patches to core.async adding appropriate functionality to > the mix implementation? Or is there already a better way of achieving the > desired effect? > > Thanks! > > (defn merge-channels > "Given a channel which yields other channels, combine output from all of > these into a single output channel. > > Close output channel and exit when both the input channel and all > channels read from that channel have closed." > [in-chan] > (let [out-chan (chan)] > (go > (loop [open-channels #{}, in-chan-open? true] > (let [all-chans (if in-chan-open? > (conj open-channels in-chan) > open-channels) > [v c] (when-not (empty? all-chans) (alts! (vec all-chans)))] > (cond > > (empty? all-chans) > nil > > (= c in-chan) > (if v > (recur (conj open-channels v) true) > (recur open-channels false)) > > (nil? v) > (do > (recur (disj open-channels c) in-chan-open?)) > > :else > (do > (>! out-chan v) > (recur open-channels in-chan-open?)))))) > out-chan)) > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.