Hi Aaron, On Feb 28, 2014, at 8:46 PM, Aaron Kimball <akimbal...@gmail.com> wrote:
> Hi folks, > > I was trying to work through the streaming word count example at > http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html > and couldn't get the code as-written to run. In fairness, I was trying to do > this inside the REPL rather than compiling a separate project; would the > types be different? > > In any case, here's the code I ran: > > $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell > > scala> import org.apache.spark.streaming._ > scala> val ssc = new StreamingContext(sc, Seconds(2)) > scala> val lines = ssc.socketTextStream("127.0.0.1", 1234) > scala> val words = lines.flatMap(_.split(" ")) > > // *** The following code from the html page doesn't work > // because pairs has type DStream[(String, Int)] and > // there is no reduceByKey method on this type. This seems to be an oversight in the docs. You need to import org.apache.spark.streaming.StreamingContext._ in order to get the pair functions on DStreams of pairs (through a Scala implicit conversion). reduceByKey is actually a function on something called PairDStreamFunctions, and the implicit conversion above provides it for you only if your DStream has key-value pairs. See http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html for how this works. > // Count each word in each batch > scala> val pairs = words.map(word => (word, 1)) > scala> val wordCounts = pairs.reduceByKey(_ + _) // <-- error here. no > reduceByKey() > > // Print a few of the counts to the console > scala> wordCount.print() // ... and even if the above did work, 'wordCount' > and 'wordCounts' are different symbols ;) This couldn't compile as written. Also looks like a bug in the docs. > > Instead, I got the following to run instead: > scala> val wordCounts = words.countByValue() > scala> wordCounts.print() > scala> ssc.start() // Start the computation > scala> ssc.awaitTermination() > > This worked if I ran 'nc -lk 1234' in another terminal and typed some words > into it.. but the 'wordCounts.print()' statement would only emit things to > stdout if I sent a ^D into the netcat stream. It seems to print the output > for all 2-second windows all-at-once after the ^D in the network stream. Is > this an expected effect? I don't understand the semantics of ssc.start / > awaitTermination well enough to know how it interacts with the print > statement on wordCounts (which I think is a DStream of RRDs?) It might also be that netcat didn’t flush the stream right away when you type input. Not 100% sure about that though. You could try to listen to it using netcat on a different port and see if it does. > > I set spark.cleaner.ttl to a relatively high value (I'm not sure what units > those are.. seconds or millis) because a lower value caused stderr to spam > everywhere and make my terminal unreadable. Is that part of my issue? the > spark repl said I had to set it, so I just picked a number. This shouldn’t matter for this problem. > 5) Now that Spark is a TLP, are my references to the incubator-spark.git and > the http://spark.incubator.apache.org docs woefully out of date, making this > entire exercise a goof? :) If you find these, definitely feel free to fix them, though I believe some recent pull requests fixed a few of them. Anyway, thanks for reporting this stuff! Matei