Hi Adrian, What is exactly the issue that you're facing? I did my own version and it seems to be working fine.
Please, take a look and I hope it helps. (defn process-file [ch file] (async/thread (with-open [input (io/reader file)] (doseq [line (line-seq input)] (async/>!! ch line))))) (defn parse [line] (str "Parsed: " line)) ; change it to do whatever you want (defn mapping [ch] (async/map parse [ch])) (defn start [] (let [events (mapping (async/chan))] (process-file events "10_events.json") (async/go-loop [] (let [v (async/<! events)] (println v) (recur))))) About your approach. For me, it seems a legitimate usage for core.async. Please, send us your impressions once you finish. Cheers, Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu: > > Hi, > > I've played around with core.async a bit but now I'm trying to use it for > a real project and I'm running into a problem getting data off a file and > into a channel on the JVM (i.e. as opposed to ClojureScript) > > I have around 1GB of data sitting in a file. Each line of the file > contains a separate JSON document. There are different types of document > in the file and I would like use core.async to setup a pipeline of > concurrent operations as follows so I can start processing the data before > I've finished reading the file. > > 1. Stream the raw data out of the file one line at a time, parse it as > JSON and write each line to channel (1) > 2. Read channel (1) and divide the messages up by type and write them to > new channels (2..n) > 3. Read channels (2..n) and apply business logic as appropriate > > I'd like the initial read to run in it's own thread because it will be IO > blocking. The others can run in core.async's thread pool > > I'm running into problems getting channels (1) and (2) to talk to one > another. Here's my initial spike and I would expect it to write the 10 > lines of json from the example file to stdout. > > (defn file-to-chan [ch file] > (do > (async/thread > (with-open [rdr (io/reader file)] > (doseq [line (line-seq rdr)] > (>!! ch line)))) > ch)) > > (defn parse-line [s] > (json/parse-string s (comp keyword str/lower-case))) > > (def events (chan 1 (map parse-line))) > > (go > (while true > (println (<! events)))) > > (file-to-chan events "10_events.json") > > I have a few questions... > > * Can anyone help me understand what's going wrong? (I'm sure it's > something silly, but I'm going cross eyed looking at it) > * It's effectively a batch process. Is this an appropriate use case for > core.async? > * If so, am I on the right track or is there a better way to approach this? > > Many Thanks > > Adrian > > > > > > -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.