Hi Adrian,

What is exactly the issue that you're facing?
I did my own version and it seems to be working fine.

Please, take a look and I hope it helps.

(defn process-file [ch file]
  (async/thread
    (with-open [input (io/reader file)]
      (doseq [line (line-seq input)]
        (async/>!! ch line)))))

(defn parse [line]
  (str "Parsed: " line)) ; change it to do whatever you want

(defn mapping [ch]
  (async/map parse [ch]))

(defn start []
  (let [events (mapping
                 (async/chan))]
    (process-file events "10_events.json")
    (async/go-loop []
                   (let [v (async/<! events)]
                     (println v)
                     (recur)))))

About your approach. For me, it seems a legitimate usage for core.async.
Please, send us your impressions once you finish.

Cheers,


Em terça-feira, 17 de março de 2015 09:52:17 UTC-3, Adrian Mowat escreveu:
>
> Hi,
>
> I've played around with core.async a bit but now I'm trying to use it for 
> a real project and I'm running into a problem getting data off a file and 
> into a channel on the JVM (i.e. as opposed to ClojureScript)
>
> I have around 1GB of data sitting in a file.  Each line of the file 
> contains a separate JSON document.  There are different types of document 
> in the file and I would like use core.async to setup a pipeline of 
> concurrent operations as follows so I can start processing the data before 
> I've finished reading the file.
>
> 1. Stream the raw data out of the file one line at a time, parse it as 
> JSON and write each line to channel (1)
> 2. Read channel (1) and divide the messages up by type and write them to 
> new channels (2..n)
> 3. Read channels (2..n) and apply business logic as appropriate
>
> I'd like the initial read to run in it's own thread because it will be IO 
> blocking.  The others can run in core.async's thread pool 
>
> I'm running into problems getting channels (1) and (2) to talk to one 
> another.  Here's my initial spike and I would expect it to write the 10 
> lines of json from the example file to stdout. 
>
> (defn file-to-chan [ch file]
>   (do
>     (async/thread
>       (with-open [rdr (io/reader file)]
>         (doseq [line (line-seq rdr)]
>           (>!! ch line))))
>     ch))
>
> (defn parse-line [s]
>   (json/parse-string s (comp keyword str/lower-case)))
>
> (def events (chan 1 (map parse-line)))
>
> (go
>   (while true
>     (println (<! events))))
>
> (file-to-chan events "10_events.json")
>
> I have a few questions...
>
> * Can anyone help me understand what's going wrong? (I'm sure it's 
> something silly, but I'm going cross eyed looking at it)
> * It's effectively a batch process.  Is this an appropriate use case for 
> core.async?
> * If so, am I on the right track or is there a better way to approach this?
>
> Many Thanks
>
> Adrian
>
>
>
>
>
>

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to