Rob - I’d go with Gary's approach, which essentially moves the splitting up of 
the chunk of results from the core.async channel’s transducer to the producing 
function. You can do that using a channel with a fixed buffer of 50 and >!!. As 
long as the next db query is blocked until each of the results from the 
previous query are put onto the channel it’ll work as you want.

I think the behavior in our examples differ because the blocking puts will 
complete whenever there is a take and the buffer is not full, ignoring whether 
the transducer is still outputting values. This bug may be relevant, though 
there it arises in a less common scenario (fixed buffer of size 0, which is now 
disallowed) https://dev.clojure.org/jira/browse/ASYNC-140 
<https://dev.clojure.org/jira/browse/ASYNC-140>

> On Jan 5, 2018, at 8:01 PM, Gary Verhaegen <gary.verhae...@gmail.com> wrote:
> 
> On 5 January 2018 at 19:44, Rob Nikander <rob.nikan...@gmail.com 
> <mailto:rob.nikan...@gmail.com>> wrote:
> Hi,
> 
> I’m wondering if there is a core.async design idiom for this situation...
> 
> - A buffered channel 
> - One producer feeding it 
> - A bunch of consumers pulling from it.
> - Producer should wake up and fill the channel only when it’s empty. In other 
> words, the producer should work in chunks.
> 
> My first idea is to have two channels. The second will be used by consumers 
> to signal the producer that the primary channel is empty. But I'm wondering 
> if there is a better way.
> 
> The motive for this is that the producer is doing a DB query that is more 
> efficient in bulk. `select ... limit 50` rather than `select ... limit 1` 50 
> times.
> 
> Rob
> 
> What about simply having the producer put items one by one on the channel?
> 
> (ns t.core
>   (:require [clojure.core.async :as async]))
> 
> (defn ap
>   "atomic print"
>   [m]
>   (print (str (pr-str m) "\n")))
> 
> (defn produce-next-batch
>   [s]
>   (let [m (+ s 10)]
>     [(range s m) m]))
> 
> (defn chunked-producer
>   [init-state]
>   (let [result-chan (async/chan)]
>     (async/go
>       (loop [[batch cursor] (produce-next-batch init-state)]
>         (ap {:produced batch})
>         (doseq [elem batch]
>           (async/>! result-chan elem))
>         (recur (produce-next-batch cursor))))
>     result-chan))
> 
> (defn run-consumer
>   [ch id n]
>   (async/go
>     (dotimes [_ n]
>       (ap {:id id :received (async/<! ch)})
>       (async/<! (async/timeout 10)))))
> 
> (defn run
>   []
>   (let [c (chunked-producer 0)
>         c1 (run-consumer c 1 10)
>         c2 (run-consumer c 2 10)
>         c3 (run-consumer c 3 10)]
>     (->> [c1 c2 c3]
>          (mapv async/<!!))
>     (flush)))
> Here produce-next-batch has been deliberately written to evoke the idea that 
> you have some sort of state or cursor that lets you produce the next batch. 
> Real code would obviously need to account for exceptions, handle channel 
> closing, etc., but hopefully this illustrate the idea.
> 
> 
> 
> -- 
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clojure@googlegroups.com
> Note that posts from new members are moderated - please be patient with your 
> first post.
> To unsubscribe from this group, send email to
> clojure+unsubscr...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en 
> <http://groups.google.com/group/clojure?hl=en>
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Clojure" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to clojure+unsubscr...@googlegroups.com 
> <mailto:clojure+unsubscr...@googlegroups.com>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to