Running `nc -lk 1234`  in one terminal, and running `nc localhost 1234` in
another, it demonstrates line-buffered behavior. It's a mystery!

Thanks for the link on implicit conversions. The example makes sense.
 Makes the code easier to trace too. I'll send a JIRA + pull req to touch
up the docs.

cheers,
- Aaron


On Sun, Mar 2, 2014 at 4:59 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote:

> Hi Aaron,
>
> On Feb 28, 2014, at 8:46 PM, Aaron Kimball <akimbal...@gmail.com> wrote:
>
> > Hi folks,
> >
> > I was trying to work through the streaming word count example at
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.htmland
>  couldn't get the code as-written to run. In fairness, I was trying to
> do this inside the REPL rather than compiling a separate project; would the
> types be different?
> >
> > In any case, here's the code I ran:
> >
> > $ SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=300000 bin/spark-shell
> >
> > scala> import org.apache.spark.streaming._
> > scala> val ssc = new StreamingContext(sc, Seconds(2))
> > scala> val lines = ssc.socketTextStream("127.0.0.1", 1234)
> > scala> val words = lines.flatMap(_.split(" "))
> >
> > // *** The following code from the html page doesn't work
> > // because pairs has type DStream[(String, Int)] and
> > // there is no reduceByKey method on this type.
>
> This seems to be an oversight in the docs. You need to import
> org.apache.spark.streaming.StreamingContext._ in order to get the pair
> functions on DStreams of pairs (through a Scala implicit conversion).
> reduceByKey is actually a function on something called
> PairDStreamFunctions, and the implicit conversion above provides it for you
> only if your DStream has key-value pairs. See
> http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.htmlfor 
> how this works.
>
>
> > // Count each word in each batch
> > scala> val pairs = words.map(word => (word, 1))
> > scala> val wordCounts = pairs.reduceByKey(_ + _)  // <-- error here. no
> reduceByKey()
> >
> > // Print a few of the counts to the console
> > scala> wordCount.print()   // ... and even if the above did work,
> 'wordCount' and 'wordCounts' are different symbols ;) This couldn't compile
> as written.
>
> Also looks like a bug in the docs.
>
> >
> > Instead, I got the following to run instead:
> > scala> val wordCounts = words.countByValue()
> > scala> wordCounts.print()
> > scala> ssc.start()             // Start the computation
> > scala> ssc.awaitTermination()
> >
> > This worked if I ran 'nc -lk 1234' in another terminal and typed some
> words into it.. but the 'wordCounts.print()' statement would only emit
> things to stdout if I sent a ^D into the netcat stream. It seems to print
> the output for all 2-second windows all-at-once after the ^D in the network
> stream. Is this an expected effect? I don't understand the semantics of
> ssc.start / awaitTermination well enough to know how it interacts with the
> print statement on wordCounts (which I think is a DStream of RRDs?)
>
> It might also be that netcat didn't flush the stream right away when you
> type input. Not 100% sure about that though. You could try to listen to it
> using netcat on a different port and see if it does.
>
> >
> > I set spark.cleaner.ttl to a relatively high value (I'm not sure what
> units those are.. seconds or millis) because a lower value caused stderr to
> spam everywhere and make my terminal unreadable. Is that part of my issue?
> the spark repl said I had to set it, so I just picked a number.
>
> This shouldn't matter for this problem.
>
> > 5) Now that Spark is a TLP, are my references to the incubator-spark.git
> and the http://spark.incubator.apache.org docs woefully out of date,
> making this entire exercise a goof? :)
>
> If you find these, definitely feel free to fix them, though I believe some
> recent pull requests fixed a few of them.
>
> Anyway, thanks for reporting this stuff!
>
> Matei
>
>

Reply via email to