Howdy Charles,
For a static mix of sources there's async/merge [1], and it behaves the 
almost exactly way you describe, impl here [2].

Stepping back, the source chan-of-chans being closed represents value 
production being fully in flight, while the merged channel being closed 
represents consumption being finished, and are distinctly different events 
w.r.t. this particular process.  Can you have the producer explicit for 
whoever is producing chans to signal explicitly, and for the consumer to 
consume the merge? If you want to terminate early upon the producer chan, 
listen to that explicit signal -- produced by some proc that has a more 
knowledge (<! done)

I'm not sure that async/mix is the right abstraction for your stated needs 
because a mix doesn't terminate except when something closes the merged 
channel.  A mix is 'alive' even when there is nothing to read or nothing 
left to read (which is btw the initial state of all mixes), and contradicts 
the stated goal.  For your goal, just alts! on the input channel + the 
already read channels.

(defn merge-from-chan [input]
  (let [out (chan)]
    (go-loop [reads [input]]
      (when (pos? (count reads))
        (let [[v sc] (alts! reads)]
          (if (= input sc)
             (if (some? v)
               (recur (conj reads v))  ;; new source
               (recur (filterv #(not= input %) reads)))  ;; input done, 
take it out of action
             (if (some? v)  ;; value from elsewhere
                (do (>! output) (recur reads))
                (recur (filterv #(not= sc %) reads))))))
    out))

Hope this helps,

[1] https://clojure.github.io/core.async/#clojure.core.async/merge
[2] 
https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L931-L947

On Tuesday, April 22, 2014 5:35:24 PM UTC-4, Charles Duffy wrote:
>
> Howdy --
>
> Trying to use core.async's mix facility, one difficulty I've run into is 
> the lack of a means to close the output channel only after al inputs have 
> been exhausted. In the interim, I've ended up using the below instead -- 
> which lacks some of the facilities provided by the mix interface (muting, 
> pausing, soloing, etc), but has the advantage of cleanly closing its output 
> channel if and only if all inputs (including the channel from which other 
> channels are read) have been flushed.
>
> That said, looking at core.async's code, it certainly seems possible to 
> extend the mix abstraction to handle this use case -- for instance, adding 
> a (close-on-empty! [mix]) call which would set a flag causing the the 
> output channel to close whenever all inputs are unmixed, whether by user 
> interaction or end-of-input, would be another solution.
>
> Thoughts? Should I publish the below (and related tooling -- such as an 
> N-way split helper used to create such channels of channels) as a separate 
> library? Submit patches to core.async adding appropriate functionality to 
> the mix implementation? Or is there already a better way of achieving the 
> desired effect?
>
> Thanks!
>
> (defn merge-channels
>   "Given a channel which yields other channels, combine output from all of 
> these into a single output channel.
>
>   Close output channel and exit when both the input channel and all 
> channels read from that channel have closed."
>   [in-chan]
>   (let [out-chan (chan)]
>     (go
>       (loop [open-channels #{}, in-chan-open? true]
>         (let [all-chans (if in-chan-open?
>                           (conj open-channels in-chan)
>                           open-channels)
>               [v c] (when-not (empty? all-chans) (alts! (vec all-chans)))]
>           (cond
>
>            (empty? all-chans)
>            nil
>
>            (= c in-chan)
>            (if v
>              (recur (conj open-channels v) true)
>              (recur open-channels false))
>
>            (nil? v)
>            (do
>              (recur (disj open-channels c) in-chan-open?))
>
>            :else
>            (do
>              (>! out-chan v)
>              (recur open-channels in-chan-open?))))))
>     out-chan))
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to