Couple of points: - If this piece of code is a performance bottleneck, benchmark whether go 'threads' or real threads better suits your needs. - Handle edge cases, e.g. how your code behave when the channels close, mainly if you have (while true ...) combo. - async/thread uses growing thread pool, so having 5 long running threads won't hurt it. - async/map> and async/pipe may help to produce more clean code. - It is better to test and reason about the function if it does not use global vars. E.g. make chan-in and chan-out arguments in the aggregator fn rather than using global vars directly.
JW On Thursday, January 30, 2014 3:08:58 AM UTC+1, Sean Pietz wrote: > > I'm writing a an ETL process to read event level data from a product > database, transform / aggregate it and write to to an analytics data > warehouse. I'm using clojure's core.async library to separate these process > into concurrently executing components. Here's what the main part of my > code looks like right now > > (ns data-staging.main > (:require [clojure.core.async :as async]) > (:use [clojure.core.match :only (match)] > [data-staging.map-vecs] > [data-staging.tables]) > (:gen-class)) > > (def submissions (make-table "Submission" "Valid")) > (def photos (make-table "Photo")) > (def videos (make-table "Video")) > (def votes (make-table "Votes")) > > ;; define channels used for sequential data processing > (def chan-in (async/chan 100)) > (def chan-out (async/chan 100)) > > (defn write-thread [table] > "infinitely loops between reading subsequent 10000 rows from > table and ouputting a vector of the rows(maps) > into 'chan-in'" > (while true > (let [next-rows (get-rows table)] > (async/>!! chan-in next-rows) > (set-max table (:max-id (last next-rows)))))) > > (defn aggregator [] > "takes output from 'chan-in' and aggregates it by coupon_id, date. > then adds / drops any fields that are needed / not needed and inputs > into 'chan-out'" > (while true > (->> > (async/<!! chan-in) > aggregate > (async/>!! chan-out)))) > > (defn read-thread [] > "reads data from chan out and interts into Analytics DB" > (while true > (upsert (async/<!! chan-out)))) > > (defn -main [] > (async/thread (write-thread submissions)) > (async/thread (write-thread photos)) > (async/thread (write-thread videos)) > (async/thread-call aggregator) > (async/thread-call read-thread)) > > As you can see, I'm putting each os component on to its own thread and > using the blocking >!! call on the channels. It feels like using the > non-blocking >! calls along with go routines might be better for this use > case, especially for the database reads which spend most of their time > performing i/o and waiting for new rows in the product db. Is this the > case, and if so, what would be the best way to implement it? I'm a little > unclear on all the tradeoffs between the two methods and on exactly how to > effectively use go routines. Also any other suggestions on how to improve > the overall architecture would be much appreciated! > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.