Hey,
the lab stuff looks very interesting, I however couldn't quite figure out
how to "unsubscribe" one channel from the broadcast since I cannot exchange
the "topic" for every subscriber when one subscriber decides to leave. Its
also a lot lower level than I'm currently comfortable with since I haven't
checked out any of the core.async internals yet.
However I wrote my own little pubsub utilities which (almost) only use the
public API.
(def my-topic (pubsub/topic 100))
(pubsub/subscribe-go
[subscription my-topic (sliding-buffer 100)]
(loop []
(when-let [ev (<! subscription)]
(prn [:sub-got ev])
(recur))))
;; without go
(let [sub (pubsub/subscribe my-topic (sliding-buffer 100))]
(prn [:msg (<!! sub)])
(close! sub))
Complete code at: https://gist.github.com/thheller/5973825
subscribe-go is a convenience macro which allows to subscribe to multiple
topics and is a normal go block, so same rules apply and you may take! from
any other channel as well. When the block ends the subscription is
automatically removed, otherwise a subscription is removed by closing it.
Could be optimized but it seems to work fine for me.
Will follow the lab.clj when I'm ready to get dirty with the internals. ;)
Cheers,
/thomas
On Wednesday, July 10, 2013 12:27:59 PM UTC+2, Alex Miller wrote:
>
> There is a broadcast fn in the lab namespace (
> https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/lab.clj)
>
> that does this and I believe David Nolen has created a different variant in
> some of his stuff. The lab one is experimental and would welcome feedback
> on it.
>
> Alex
>
> On Tuesday, July 9, 2013 6:46:21 PM UTC-5, Thomas Heller wrote:
>>
>> Hey,
>>
>> I'm doing some core.async tests and want to create a basic pub/sub model.
>> Messages are >! on one channel and >! to many others.
>>
>> (deftest ^:wip async-test2
>>
>> (let [subscribers (atom [])
>> events (chan 100)]
>>
>> (go (loop []
>> (when-let [ev (<! events)]
>> (doseq [c @subscribers]
>> (alt!
>> [[c ev]] :sent
>> :default nil ;; could "force" unsubscribe c?
>> ))
>> (recur))))
>>
>> (let [s1 (chan 1)
>> s2 (chan 100)
>> r1 (atom [])
>> r2 (atom [])]
>> (swap! subscribers conj s1 s2)
>> ;; simulated slow reader
>> (go (loop []
>> (when-let [ev (<! s1)]
>> (swap! r1 conj ev)
>> (<! (timeout 10))
>> (recur))))
>> ;; good reader
>> (go (loop []
>> (when-let [ev (<! s2)]
>> (swap! r2 conj ev)
>> (recur))))
>> (<!! (go (loop [i 0]
>> (when (< i 100)
>> (>! events i)
>> (recur (inc i))))))
>>
>> (close! events)
>> (Thread/sleep 25)
>> (pprint @r1)
>> (pprint @r2))
>> ))
>>
>>
>> In this example the s1 subscriber will loose almost all messages since he
>> cannot keep up (and buffer is too small). I choose to alt!/:default to drop
>> messages, since I don't want any subscriber to block others. How do you
>> guys deal with slow-readers?
>>
>> I don't really have a specific problem but I wonder if there are any
>> plans for some built-in pub/sub mechanisms for core.async. Seems like a
>> very common pattern.
>>
>> Anyways, core.async is nice!
>>
>> Cheers,
>> /thomas
>>
>>
>>
--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/groups/opt_out.