Hi folks,

I was trying to work through the streaming word count example at
http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland
couldn't get the code as-written to run. In fairness, I was trying to
do this inside the REPL rather than compiling a separate project; would the
types be different?

In any case, here's the code I ran:

$ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell

scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(2))
scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
scala> val words = lines.flatMap(_.split(" "))

// *** The following code from the html page doesn't work
// because pairs has type DStream[(String, Int)] and
// there is no reduceByKey method on this type.

// Count each word in each batch
scala> val pairs = words.map(word => (word, 1))
scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
reduceByKey()

// Print a few of the counts to the console
scala> wordCount.print()   // ... and even if the above did work,
'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
as written.


Instead, I got the following to run instead:
scala> val wordCounts = words.countByValue()
scala> wordCounts.print()
scala> ssc.start()             // Start the computation
scala> ssc.awaitTermination()

This worked if I ran 'nc -lk 1234' in another terminal and typed some words
into it.. but the 'wordCounts.print()' statement would only emit things to
stdout if I sent a ^D into the netcat stream. It seems to print the output
for all 2-second windows all-at-once after the ^D in the network stream. Is
this an expected effect? I don't understand the semantics of ssc.start /
awaitTermination well enough to know how it interacts with the print
statement on wordCounts (which I think is a DStream of RRDs?)

I set spark.cleaner.ttl to a relatively high value (I'm not sure what units
those are.. seconds or millis) because a lower value caused stderr to spam
everywhere and make my terminal unreadable. Is that part of my issue? the
spark repl said I had to set it, so I just picked a number.

I kind of expected wordCounts.print() to be constantly emitting (word, N)
pairs to my spark terminal as I typed into the netcat side of things.

I'm using Spark built from github source that I pulled from source earlier
today.

I am using the following as my 'origin':
  Fetch URL: git://github.com/apache/incubator-spark.git

... and the most recent commit (master a.k.a. HEAD) is:
commit 4d880304867b55a4f2138617b30600b7fa013b14
Author: Bryn Keller <bryn.kel...@intel.com>
Date:   Mon Feb 24 17:35:22 2014 -0800


In any case, I'm happy to help update the docs (or the code) if this is a
bug. I realize this is getting long-winded. But in any case, I think my
questions really boil down to:

1) should there be a reduceByKey() method on DStream? The documentation at
http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmlsays
so in the "Transformations" section, but the scaladoc at
https://spark.incubator.apache.org/docs/latest/api/streaming/index.html#org.apache.spark.streaming.dstream.DStreamdoesn't
list it.  DStream.scala also doesn't have a definition for such a
method...

(and based on reading the source of NetworkWordCount.scala, I can't
spot-identify why this *does* work there (i.e., reduceByKey compiles) but
it doesn't do so in the terminal)

2) Why do I have to wait for the stream to "terminate" with a ^D before
seeing any stdout in the repl from the wordCounts.print() statement?
 Doesn't this defeat the point of "streaming"?
2a) how does the print() statement interact with ssc.start() and
ssc.awaitTermination() ?

3) is the cleaner TTL something that, as a user, I should be adjusting to
change my observed effects? i.e., would adjusting this change the frequency
of emissions to stdout of prior window data?  Or is this just a background
property that happens to affect the spamminess of my stderr that is routed
to the same console?

4) Should I update the documentation to match my example (i.e., no
reduceByKey, but use words.countByValue() instead)?

5) Now that Spark is a TLP, are my references to the incubator-spark.git
and the http://spark.incubator.apache.org docs woefully out of date, making
this entire exercise a goof? :)

Thanks for the help!

Cheers,
- Aaron

Reply via email to