Hi Erick Thanks for getting back to me. On my system, I wasn't seeing the contents of my file being listed in the REPL. Your code is working fine though and I can't see anything significantly different so I wonder if I had managed to corrupt my session in some way.
Anyway, it's good to know I'm on the right path. I'll post my solutions as I get things up and running Cheers Adrian On Wednesday, 18 March 2015 13:45:33 UTC, Erick Pintor wrote: > > Hi Adrian, > > What is exactly the issue that you're facing? > I did my own version and it seems to be working fine. > > Please, take a look and I hope it helps. > > (defn process-file [ch file] > (async/thread > (with-open [input (io/reader file)] > (doseq [line (line-seq input)] > (async/>!! ch line))))) > > (defn parse [line] > (str "Parsed: " line)) ; change it to do whatever you want > > (defn mapping [ch] > (async/map parse [ch])) > > (defn start [] > (let [events (mapping > (async/chan))] > (process-file events "10_events.json") > (async/go-loop [] > (let [v (async/<! events)] > (println v) > (recur))))) > > About your approach. For me, it seems a legitimate usage for core.async. > Please, send us your impressions once you finish. > > Cheers, > > > Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu: >> >> Hi, >> >> I've played around with core.async a bit but now I'm trying to use it for >> a real project and I'm running into a problem getting data off a file and >> into a channel on the JVM (i.e. as opposed to ClojureScript) >> >> I have around 1GB of data sitting in a file. Each line of the file >> contains a separate JSON document. There are different types of document >> in the file and I would like use core.async to setup a pipeline of >> concurrent operations as follows so I can start processing the data before >> I've finished reading the file. >> >> 1. Stream the raw data out of the file one line at a time, parse it as >> JSON and write each line to channel (1) >> 2. Read channel (1) and divide the messages up by type and write them to >> new channels (2..n) >> 3. Read channels (2..n) and apply business logic as appropriate >> >> I'd like the initial read to run in it's own thread because it will be IO >> blocking. The others can run in core.async's thread pool >> >> I'm running into problems getting channels (1) and (2) to talk to one >> another. Here's my initial spike and I would expect it to write the 10 >> lines of json from the example file to stdout. >> >> (defn file-to-chan [ch file] >> (do >> (async/thread >> (with-open [rdr (io/reader file)] >> (doseq [line (line-seq rdr)] >> (>!! ch line)))) >> ch)) >> >> (defn parse-line [s] >> (json/parse-string s (comp keyword str/lower-case))) >> >> (def events (chan 1 (map parse-line))) >> >> (go >> (while true >> (println (<! events)))) >> >> (file-to-chan events "10_events.json") >> >> I have a few questions... >> >> * Can anyone help me understand what's going wrong? (I'm sure it's >> something silly, but I'm going cross eyed looking at it) >> * It's effectively a batch process. Is this an appropriate use case for >> core.async? >> * If so, am I on the right track or is there a better way to approach >> this? >> >> Many Thanks >> >> Adrian >> >> >> >> >> >> -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.