It seems like you're generally on the right track here (though Erick 
Pintor's code has some nice cleanup, like removal of necessary do, etc). 
The one thing I'd recommend is testing what happens with a larger channel 
buffer; if the file io isn't the bottleneck, but rather the processing, 
this could help the concurrency performance by making sure there's always 
something to be ready to be taken/worked on; It would be cool to see some 
metrics on that for your use case.

Best

Chris


On Tuesday, March 17, 2015 at 5:52:17 AM UTC-7, Adrian Mowat wrote:
>
> Hi,
>
> I've played around with core.async a bit but now I'm trying to use it for 
> a real project and I'm running into a problem getting data off a file and 
> into a channel on the JVM (i.e. as opposed to ClojureScript)
>
> I have around 1GB of data sitting in a file.  Each line of the file 
> contains a separate JSON document.  There are different types of document 
> in the file and I would like use core.async to setup a pipeline of 
> concurrent operations as follows so I can start processing the data before 
> I've finished reading the file.
>
> 1. Stream the raw data out of the file one line at a time, parse it as 
> JSON and write each line to channel (1)
> 2. Read channel (1) and divide the messages up by type and write them to 
> new channels (2..n)
> 3. Read channels (2..n) and apply business logic as appropriate
>
> I'd like the initial read to run in it's own thread because it will be IO 
> blocking.  The others can run in core.async's thread pool 
>
> I'm running into problems getting channels (1) and (2) to talk to one 
> another.  Here's my initial spike and I would expect it to write the 10 
> lines of json from the example file to stdout. 
>
> (defn file-to-chan [ch file]
>   (do
>     (async/thread
>       (with-open [rdr (io/reader file)]
>         (doseq [line (line-seq rdr)]
>           (>!! ch line))))
>     ch))
>
> (defn parse-line [s]
>   (json/parse-string s (comp keyword str/lower-case)))
>
> (def events (chan 1 (map parse-line)))
>
> (go
>   (while true
>     (println (<! events))))
>
> (file-to-chan events "10_events.json")
>
> I have a few questions...
>
> * Can anyone help me understand what's going wrong? (I'm sure it's 
> something silly, but I'm going cross eyed looking at it)
> * It's effectively a batch process.  Is this an appropriate use case for 
> core.async?
> * If so, am I on the right track or is there a better way to approach this?
>
> Many Thanks
>
> Adrian
>
>
>
>
>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to