I am trying to run the NetworkWordCount.java in Spark streaming examples. I was able to run it using run-example. I was now trying to run the same code from an app I created. This is the code- it looks pretty much similar to the existing code:
import scala.Tuple2; import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import java.util.regex.Pattern; public final class SimpleApp { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("Simple App").setMaster("local"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); JavaReceiverInputDStream<String> lines = ssc.socketTextStream( "localhost", Integer.parseInt("9999"), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); } } mvn package is able to build it successfully. But on running mvn exec:java -Dexec.mainClass="SimpleApp" i get the following output: [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building Simple Project 1.0 [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ simple-project --- Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/29 00:08:27 INFO SecurityManager: Changing view acls to: mehaksoni 15/03/29 00:08:27 INFO SecurityManager: Changing modify acls to: mehaksoni 15/03/29 00:08:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(mehaksoni); users with modify permissions: Set(mehaksoni) 15/03/29 00:08:28 INFO Slf4jLogger: Slf4jLogger started 15/03/29 00:08:28 INFO Remoting: Starting remoting 15/03/29 00:08:28 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.123.96.231:61177] 15/03/29 00:08:28 INFO Utils: Successfully started service 'sparkDriver' on port 61177. 15/03/29 00:08:28 INFO SparkEnv: Registering MapOutputTracker 15/03/29 00:08:28 INFO SparkEnv: Registering BlockManagerMaster 15/03/29 00:08:28 INFO DiskBlockManager: Created local directory at /var/folders/18/_53nwls95_33qmkf5hbqllgr0000gn/T/spark-7e3bc424-9c2c-495f-af94-5b2890f6f107/spark-74020ff4-2e74-4914-bb2d-0979316790ac 15/03/29 00:08:28 INFO MemoryStore: MemoryStore started with capacity 66.9 MB 2015-03-29 00:08:28.888 java[24564:4520263] Unable to load realm info from SCDynamicStore 15/03/29 00:08:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/29 00:08:29 INFO HttpFileServer: HTTP File server directory is /var/folders/18/_53nwls95_33qmkf5hbqllgr0000gn/T/spark-54be2f35-7fa3-4770-89ac-b02921424065/spark-97dd7da4-6158-4085-a0d8-21e59d30b105 15/03/29 00:08:29 INFO HttpServer: Starting HTTP Server 15/03/29 00:08:29 INFO Utils: Successfully started service 'HTTP file server' on port 61178. 15/03/29 00:08:29 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/29 00:08:29 INFO SparkUI: Started SparkUI at http://10.123.96.231:4040 15/03/29 00:08:29 INFO Executor: Starting executor ID <driver> on host localhost 15/03/29 00:08:29 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.123.96.231:61177/user/HeartbeatReceiver 15/03/29 00:08:29 INFO NettyBlockTransferService: Server created on 61179 15/03/29 00:08:29 INFO BlockManagerMaster: Trying to register BlockManager 15/03/29 00:08:29 INFO BlockManagerMasterActor: Registering block manager localhost:61179 with 66.9 MB RAM, BlockManagerId(<driver>, localhost, 61179) 15/03/29 00:08:29 INFO BlockManagerMaster: Registered BlockManager 15/03/29 00:08:30 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data. 15/03/29 00:08:30 INFO ReceiverTracker: ReceiverTracker started 15/03/29 00:08:30 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/29 00:08:30 INFO ShuffledDStream: metadataCleanupDelay = -1 15/03/29 00:08:30 INFO MappedDStream: metadataCleanupDelay = -1 15/03/29 00:08:30 INFO FlatMappedDStream: metadataCleanupDelay = -1 15/03/29 00:08:30 INFO SocketInputDStream: metadataCleanupDelay = -1 15/03/29 00:08:30 INFO SocketInputDStream: Slide time = 1000 ms 15/03/29 00:08:30 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/29 00:08:30 INFO SocketInputDStream: Checkpoint interval = null 15/03/29 00:08:30 INFO SocketInputDStream: Remember duration = 1000 ms 15/03/29 00:08:30 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@2b237512 15/03/29 00:08:30 INFO FlatMappedDStream: Slide time = 1000 ms 15/03/29 00:08:30 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/29 00:08:30 INFO FlatMappedDStream: Checkpoint interval = null 15/03/29 00:08:30 INFO FlatMappedDStream: Remember duration = 1000 ms 15/03/29 00:08:30 INFO FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@279febb9 15/03/29 00:08:30 INFO MappedDStream: Slide time = 1000 ms 15/03/29 00:08:30 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/29 00:08:30 INFO MappedDStream: Checkpoint interval = null 15/03/29 00:08:30 INFO MappedDStream: Remember duration = 1000 ms 15/03/29 00:08:30 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@2ad1223d 15/03/29 00:08:30 INFO ShuffledDStream: Slide time = 1000 ms 15/03/29 00:08:30 INFO ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/29 00:08:30 INFO ShuffledDStream: Checkpoint interval = null 15/03/29 00:08:30 INFO ShuffledDStream: Remember duration = 1000 ms 15/03/29 00:08:30 INFO ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@81ee8c1 15/03/29 00:08:30 INFO ForEachDStream: Slide time = 1000 ms 15/03/29 00:08:30 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/29 00:08:30 INFO ForEachDStream: Checkpoint interval = null 15/03/29 00:08:30 INFO ForEachDStream: Remember duration = 1000 ms 15/03/29 00:08:30 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@2542db11 15/03/29 00:08:30 INFO ReceiverTracker: Starting 1 receivers 15/03/29 00:08:30 INFO RecurringTimer: Started timer for JobGenerator at time 1427612911000 15/03/29 00:08:30 INFO JobGenerator: Started JobGenerator at 1427612911000 ms 15/03/29 00:08:30 INFO JobScheduler: Started JobScheduler 15/03/29 00:08:30 INFO SparkContext: Starting job: start at SimpleApp.java:94 15/03/29 00:08:30 INFO DAGScheduler: Got job 0 (start at SimpleApp.java:94) with 1 output partitions (allowLocal=false) 15/03/29 00:08:30 INFO DAGScheduler: Final stage: Stage 0(start at SimpleApp.java:94) 15/03/29 00:08:30 INFO DAGScheduler: Parents of final stage: List() 15/03/29 00:08:30 INFO DAGScheduler: Missing parents: List() 15/03/29 00:08:30 INFO DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at start at SimpleApp.java:94), which has no missing parents 15/03/29 00:08:31 INFO JobScheduler: Added jobs for time 1427612911000 ms 15/03/29 00:08:31 INFO JobScheduler: Starting job streaming job 1427612911000 ms.0 from job set of time 1427612911000 ms 15/03/29 00:08:31 INFO SparkContext: Starting job: print at SimpleApp.java:93 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(34736) called with curMem=0, maxMem=70177259 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 33.9 KB, free 66.9 MB) 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(19382) called with curMem=34736, maxMem=70177259 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 18.9 KB, free 66.9 MB) 15/03/29 00:08:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:61179 (size: 18.9 KB, free: 66.9 MB) 15/03/29 00:08:31 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/29 00:08:31 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:838 15/03/29 00:08:31 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at start at SimpleApp.java:94) 15/03/29 00:08:31 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/03/29 00:08:31 INFO DAGScheduler: Registering RDD 3 (mapToPair at SimpleApp.java:80) 15/03/29 00:08:31 INFO DAGScheduler: Got job 1 (print at SimpleApp.java:93) with 1 output partitions (allowLocal=true) 15/03/29 00:08:31 INFO DAGScheduler: Final stage: Stage 2(print at SimpleApp.java:93) 15/03/29 00:08:31 INFO DAGScheduler: Parents of final stage: List(Stage 1) 15/03/29 00:08:31 INFO DAGScheduler: Missing parents: List() 15/03/29 00:08:31 INFO DAGScheduler: Submitting Stage 2 (ShuffledRDD[4] at reduceByKey at SimpleApp.java:80), which has no missing parents 15/03/29 00:08:31 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1874 bytes) 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(2248) called with curMem=54118, maxMem=70177259 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.2 KB, free 66.9 MB) 15/03/29 00:08:31 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(1664) called with curMem=56366, maxMem=70177259 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1664.0 B, free 66.9 MB) 15/03/29 00:08:31 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:61179 (size: 1664.0 B, free: 66.9 MB) 15/03/29 00:08:31 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/29 00:08:31 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/03/29 00:08:31 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (ShuffledRDD[4] at reduceByKey at SimpleApp.java:80) 15/03/29 00:08:31 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 15/03/29 00:08:31 INFO RecurringTimer: Started timer for BlockGenerator at time 1427612911600 15/03/29 00:08:31 INFO BlockGenerator: Started BlockGenerator 15/03/29 00:08:31 INFO ReceiverSupervisorImpl: Starting receiver 15/03/29 00:08:31 INFO ReceiverSupervisorImpl: Called receiver onStart 15/03/29 00:08:31 INFO BlockGenerator: Started block pushing thread 15/03/29 00:08:31 INFO SocketReceiver: Connecting to localhost:61112 15/03/29 00:08:31 INFO ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver 15/03/29 00:08:31 INFO SocketReceiver: Connected to localhost:61112 15/03/29 00:08:32 INFO JobScheduler: Added jobs for time 1427612912000 ms 15/03/29 00:08:33 INFO JobScheduler: Added jobs for time 1427612913000 ms 15/03/29 00:08:34 INFO JobScheduler: Added jobs for time 1427612914000 ms 15/03/29 00:08:35 INFO JobScheduler: Added jobs for time 1427612915000 ms 15/03/29 00:08:36 INFO JobScheduler: Added jobs for time 1427612916000 ms 15/03/29 00:08:36 INFO MemoryStore: ensureFreeSpace(9) called with curMem=58030, maxMem=70177259 15/03/29 00:08:36 INFO MemoryStore: Block input-0-1427612916400 stored as bytes in memory (estimated size 9.0 B, free 66.9 MB) 15/03/29 00:08:36 INFO BlockManagerInfo: Added input-0-1427612916400 in memory on localhost:61179 (size: 9.0 B, free: 66.9 MB) 15/03/29 00:08:36 INFO BlockManagerMaster: Updated info of block input-0-1427612916400 15/03/29 00:08:36 INFO BlockGenerator: Pushed block input-0-1427612916400 15/03/29 00:08:37 INFO JobScheduler: Added jobs for time 1427612917000 ms 15/03/29 00:08:38 INFO JobScheduler: Added jobs for time 1427612918000 ms 15/03/29 00:08:39 INFO JobScheduler: Added jobs for time 1427612919000 ms 15/03/29 00:08:40 INFO JobScheduler: Added jobs for time 1427612920000 ms 15/03/29 00:08:41 INFO JobScheduler: Added jobs for time 1427612921000 ms 15/03/29 00:08:42 INFO JobScheduler: Added jobs for time 1427612922000 ms 15/03/29 00:08:43 INFO JobScheduler: Added jobs for time 1427612923000 ms 15/03/29 00:08:44 INFO JobScheduler: Added jobs for time 1427612924000 ms 15/03/29 00:08:45 INFO JobScheduler: Added jobs for time 1427612925000 ms 15/03/29 00:08:46 INFO JobScheduler: Added jobs for time 1427612926000 ms 15/03/29 00:08:47 INFO JobScheduler: Added jobs for time 1427612927000 ms When i enter words on the local Netcat server, the output shows something like this (in the terminal running this app): 15/03/29 00:08:36 INFO MemoryStore: ensureFreeSpace(9) called with curMem=58030, maxMem=70177259 15/03/29 00:08:36 INFO MemoryStore: Block input-0-1427612916400 stored as bytes in memory (estimated size 9.0 B, free 66.9 MB) 15/03/29 00:08:36 INFO BlockManagerInfo: Added input-0-1427612916400 in memory on localhost:61179 (size: 9.0 B, free: 66.9 MB) 15/03/29 00:08:36 INFO BlockManagerMaster: Updated info of block input-0-1427612916400 15/03/29 00:08:36 INFO BlockGenerator: Pushed block input-0-1427612916400 But I am not able to see the words that I had actually entered. Could anyone please offer any help or advise on what I could do for this? Any help would be appreciated! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-run-NetworkWordCount-java-tp22274.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org