Couple of points:

- If this piece of code is a performance bottleneck, benchmark whether go 
'threads' or real threads better suits your needs.
- Handle edge cases, e.g. how your code behave when the channels close, 
mainly if you have (while true ...) combo.
- async/thread uses growing thread pool, so having 5 long running threads 
won't hurt it. 
- async/map> and async/pipe may help to produce more clean code.
- It is better to test and reason about the function if it does not use 
global vars. E.g. make chan-in and chan-out arguments in the aggregator fn 
rather than using global vars directly.

JW

On Thursday, January 30, 2014 3:08:58 AM UTC+1, Sean Pietz wrote:
>
> I'm writing a an ETL process to read event level data from a product 
> database, transform / aggregate it and write to to an analytics data 
> warehouse. I'm using clojure's core.async library to separate these process 
> into concurrently executing components. Here's what the main part of my 
> code looks like right now
>
>         (ns data-staging.main
> (:require [clojure.core.async :as async])
> (:use [clojure.core.match :only (match)]
>   [data-staging.map-vecs]
>   [data-staging.tables])
> (:gen-class))
>
> (def submissions (make-table "Submission" "Valid"))
> (def photos (make-table "Photo"))
> (def videos (make-table "Video"))
> (def votes (make-table "Votes"))
>
> ;; define channels used for sequential data processing
> (def chan-in (async/chan 100))
> (def chan-out (async/chan 100))
>
> (defn write-thread [table]
> "infinitely loops between reading subsequent 10000 rows from 
>      table and ouputting a vector of the rows(maps) 
>  into 'chan-in'"
> (while true
> (let [next-rows (get-rows table)]
> (async/>!! chan-in next-rows)
> (set-max table (:max-id (last next-rows))))))
>
> (defn aggregator []
>     "takes output from 'chan-in' and aggregates it by coupon_id, date.
>      then adds / drops any fields that are needed / not needed and inputs
>      into 'chan-out'"
> (while true
> (->>
> (async/<!! chan-in)
> aggregate
> (async/>!! chan-out))))
>
> (defn read-thread []
> "reads data from chan out and interts into Analytics DB" 
> (while true 
> (upsert (async/<!! chan-out))))
>
> (defn -main []
> (async/thread (write-thread submissions))
> (async/thread (write-thread photos))
> (async/thread (write-thread videos))
> (async/thread-call aggregator)
> (async/thread-call read-thread))
>
> As you can see, I'm putting each os component on to its own thread and 
> using the blocking >!! call on the channels. It feels like using the 
> non-blocking >! calls along with go routines might be better for this use 
> case, especially for the database reads which spend most of their time 
> performing i/o and waiting for new rows in the product db. Is this the 
> case, and if so, what would be the best way to implement it? I'm a little 
> unclear on all the tradeoffs between the two methods and on exactly how to 
> effectively use go routines. Also any other suggestions on how to improve 
> the overall architecture would be much appreciated!
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to