If anyone is interested, that's roughly what i'm currently thinking about...

(ns async-util.input-pipe
  (:require [clojure.core.async.impl.protocols :as impl]
            [clojure.core.async :refer [<! close! go-loop]]))
 
(defprotocol InputPipe
  (attach* [p ch])
  (detach* [p]))
 
(defn input-pipe
  "Creates and return input pipe of supplied channel."
  [ch]
  (let [i (atom nil)
        p (reify
            impl/Channel
            (close! [_] (impl/close! ch))
            (closed? [_] (impl/closed? ch))
 
            impl/ReadPort
            (take! [_ fn1] (impl/take! ch fn1))
 
            impl/WritePort
            (put! [_ val fn1] (impl/put! ch val fn1))
 
            InputPipe
            (attach* [_ ch-i] (if (impl/closed? ch)
                                false
                                (do (reset! i ch-i) true)))
            (detach* [_]  (if (impl/closed? ch)
                            false
                            (do (reset! i nil) true))))]
    (go-loop []
      (when-let [i-v @i]
        (let [val (<! i-v)]
          (if (nil? val)
            (close! p)
            (recur)))))
    p))
 
(defn attach-input
  "Attaches channel to input-pipe channel, returns false
   if input-pipe channel is closed, true otherwise."
  [p ch]
  (attach* p ch))
 
(defn detach-input
  "Detaches input from input-pipe channel, returns false
   if input-pipe channel is closed, true otherwise"
  (detach* p))


Dňa piatok, 31. januára 2014 19:18:37 UTC+1 Jan Herich napísal(-a):
>
> Hello Folks,
>
> I need to construct core.async channel with following semantics:
>
> 1. When the channel is created, timeout is attached to channel
> 2. After timeout passes out, channel is closed
> 3. It's possible to detach timeout from the channel before it passes out
>
> Now i implemented it with something like this:
>
> (defn create-cancel-channel
>   [channel-to-notify timeout-ms]
>     (let [cancel-channel (async/chan)
>           timeout-channel (async/timeout timeout-ms)]
>       (async/go
>         (let [[_ c] (async/alts! [cancel-channel timeout-channel])]
>           (when (= c timeout-channel)
>             (async/close! cancel-channel)
>             (async/close! channel-to-notify))))
>       cancel-channel))
>  
> ;; transport channel
> (def transport-channel (async/chan))
>  
> ;; cancel-channel
> (def cancel-channel (create-cancel-channel transport-channel 2000))
>  
> ;; to cancel the transport channel timeout, just close the cancel-channel
> (async/close! cancel-channel)
>
>
> But i'm rather unsatisfied with this solution because it seems little bit 
> clunky and what's worse, 
> i need to explicitly keep track of those cancel channels. 
> Obviously core.async pipe would greatly simplify this, but it seems, that 
> there's no way to disconnect
> piped channels. Maybe mult/tap/untap, even if i won't use broadcasting 
> semantic of those constructs.
> Any other ideas ?  
>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to