Hi folks, I was trying to work through the streaming word count example at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland couldn't get the code as-written to run. In fairness, I was trying to do this inside the REPL rather than compiling a separate project; would the types be different?
In any case, here's the code I ran: $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell scala> import org.apache.spark.streaming._ scala> val ssc = new StreamingContext(sc, Seconds(2)) scala> val lines = ssc.socketTextStream("127.0.0.1", 1234) scala> val words = lines.flatMap(_.split(" ")) // *** The following code from the html page doesn't work // because pairs has type DStream[(String, Int)] and // there is no reduceByKey method on this type. // Count each word in each batch scala> val pairs = words.map(word => (word, 1)) scala> val wordCounts = pairs.reduceByKey(_ + _) // <-- error here. no reduceByKey() // Print a few of the counts to the console scala> wordCount.print() // ... and even if the above did work, 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile as written. Instead, I got the following to run instead: scala> val wordCounts = words.countByValue() scala> wordCounts.print() scala> ssc.start() // Start the computation scala> ssc.awaitTermination() This worked if I ran 'nc -lk 1234' in another terminal and typed some words into it.. but the 'wordCounts.print()' statement would only emit things to stdout if I sent a ^D into the netcat stream. It seems to print the output for all 2-second windows all-at-once after the ^D in the network stream. Is this an expected effect? I don't understand the semantics of ssc.start / awaitTermination well enough to know how it interacts with the print statement on wordCounts (which I think is a DStream of RRDs?) I set spark.cleaner.ttl to a relatively high value (I'm not sure what units those are.. seconds or millis) because a lower value caused stderr to spam everywhere and make my terminal unreadable. Is that part of my issue? the spark repl said I had to set it, so I just picked a number. I kind of expected wordCounts.print() to be constantly emitting (word, N) pairs to my spark terminal as I typed into the netcat side of things. I'm using Spark built from github source that I pulled from source earlier today. I am using the following as my 'origin': Fetch URL: git://github.com/apache/incubator-spark.git ... and the most recent commit (master a.k.a. HEAD) is: commit 4d880304867b55a4f2138617b30600b7fa013b14 Author: Bryn Keller <bryn.kel...@intel.com> Date: Mon Feb 24 17:35:22 2014 -0800 In any case, I'm happy to help update the docs (or the code) if this is a bug. I realize this is getting long-winded. But in any case, I think my questions really boil down to: 1) should there be a reduceByKey() method on DStream? The documentation at http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmlsays so in the "Transformations" section, but the scaladoc at https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStreamdoesn't list it. DStream.scala also doesn't have a definition for such a method... (and based on reading the source of NetworkWordCount.scala, I can't spot-identify why this *does* work there (i.e., reduceByKey compiles) but it doesn't do so in the terminal) 2) Why do I have to wait for the stream to "terminate" with a ^D before seeing any stdout in the repl from the wordCounts.print() statement? Doesn't this defeat the point of "streaming"? 2a) how does the print() statement interact with ssc.start() and ssc.awaitTermination() ? 3) is the cleaner TTL something that, as a user, I should be adjusting to change my observed effects? i.e., would adjusting this change the frequency of emissions to stdout of prior window data? Or is this just a background property that happens to affect the spamminess of my stderr that is routed to the same console? 4) Should I update the documentation to match my example (i.e., no reduceByKey, but use words.countByValue() instead)? 5) Now that Spark is a TLP, are my references to the incubator-spark.git and the http://spark.incubator.apache.org docs woefully out of date, making this entire exercise a goof? :) Thanks for the help! Cheers, - Aaron