You need set your master as local[2], 2 is the minimum number of threads (in localmode) needed to consume and process a Streaming application.
Thanks Best Regards On Sun, Mar 29, 2015 at 12:56 PM, mehak.soni <[email protected]> wrote: > I am trying to run the NetworkWordCount.java in Spark streaming examples. I > was able to run it using run-example. > I was now trying to run the same code from an app I created. This is the > code- it looks pretty much similar to the existing code: > > import scala.Tuple2; > import com.google.common.collect.Lists; > > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.function.FlatMapFunction; > import org.apache.spark.api.java.function.Function2; > import org.apache.spark.api.java.function.PairFunction; > import org.apache.spark.api.java.StorageLevels; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaPairDStream; > import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > > import java.util.regex.Pattern; > > public final class SimpleApp { > private static final Pattern SPACE = Pattern.compile(" "); > > public static void main(String[] args) { > // Create the context with a 1 second batch size > SparkConf sparkConf = new SparkConf().setAppName("Simple > App").setMaster("local"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > Durations.seconds(1)); > > JavaReceiverInputDStream<String> lines = ssc.socketTextStream( > "localhost", Integer.parseInt("9999"), > StorageLevels.MEMORY_AND_DISK_SER); > JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, > String>() { > @Override > public Iterable<String> call(String x) { > return Lists.newArrayList(SPACE.split(x)); > } > }); > JavaPairDStream<String, Integer> wordCounts = words.mapToPair( > new PairFunction<String, String, Integer>() { > @Override > public Tuple2<String, Integer> call(String s) { > return new Tuple2<String, Integer>(s, 1); > } > }).reduceByKey(new Function2<Integer, Integer, Integer>() { > @Override > public Integer call(Integer i1, Integer i2) { > return i1 + i2; > } > }); > > wordCounts.print(); > ssc.start(); > ssc.awaitTermination(); > } > } > > mvn package is able to build it successfully. But on running mvn exec:java > -Dexec.mainClass="SimpleApp" i get the following output: > > [INFO] Scanning for projects... > [INFO] > [INFO] > ------------------------------------------------------------------------ > [INFO] Building Simple Project 1.0 > [INFO] > ------------------------------------------------------------------------ > [INFO] > [INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ simple-project --- > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 15/03/29 00:08:27 INFO SecurityManager: Changing view acls to: mehaksoni > 15/03/29 00:08:27 INFO SecurityManager: Changing modify acls to: mehaksoni > 15/03/29 00:08:27 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(mehaksoni); > users with modify permissions: Set(mehaksoni) > 15/03/29 00:08:28 INFO Slf4jLogger: Slf4jLogger started > 15/03/29 00:08:28 INFO Remoting: Starting remoting > 15/03/29 00:08:28 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://[email protected]:61177] > 15/03/29 00:08:28 INFO Utils: Successfully started service 'sparkDriver' on > port 61177. > 15/03/29 00:08:28 INFO SparkEnv: Registering MapOutputTracker > 15/03/29 00:08:28 INFO SparkEnv: Registering BlockManagerMaster > 15/03/29 00:08:28 INFO DiskBlockManager: Created local directory at > > /var/folders/18/_53nwls95_33qmkf5hbqllgr0000gn/T/spark-7e3bc424-9c2c-495f-af94-5b2890f6f107/spark-74020ff4-2e74-4914-bb2d-0979316790ac > 15/03/29 00:08:28 INFO MemoryStore: MemoryStore started with capacity 66.9 > MB > 2015-03-29 00:08:28.888 java[24564:4520263] Unable to load realm info from > SCDynamicStore > 15/03/29 00:08:28 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/03/29 00:08:29 INFO HttpFileServer: HTTP File server directory is > > /var/folders/18/_53nwls95_33qmkf5hbqllgr0000gn/T/spark-54be2f35-7fa3-4770-89ac-b02921424065/spark-97dd7da4-6158-4085-a0d8-21e59d30b105 > 15/03/29 00:08:29 INFO HttpServer: Starting HTTP Server > 15/03/29 00:08:29 INFO Utils: Successfully started service 'HTTP file > server' on port 61178. > 15/03/29 00:08:29 INFO Utils: Successfully started service 'SparkUI' on > port > 4040. > 15/03/29 00:08:29 INFO SparkUI: Started SparkUI at > http://10.123.96.231:4040 > 15/03/29 00:08:29 INFO Executor: Starting executor ID <driver> on host > localhost > 15/03/29 00:08:29 INFO AkkaUtils: Connecting to HeartbeatReceiver: > akka.tcp://[email protected]:61177/user/HeartbeatReceiver > 15/03/29 00:08:29 INFO NettyBlockTransferService: Server created on 61179 > 15/03/29 00:08:29 INFO BlockManagerMaster: Trying to register BlockManager > 15/03/29 00:08:29 INFO BlockManagerMasterActor: Registering block manager > localhost:61179 with 66.9 MB RAM, BlockManagerId(<driver>, localhost, > 61179) > 15/03/29 00:08:29 INFO BlockManagerMaster: Registered BlockManager > 15/03/29 00:08:30 WARN StreamingContext: spark.master should be set as > local[n], n > 1 in local mode if you have receivers to get data, otherwise > Spark jobs will not get resources to process the received data. > 15/03/29 00:08:30 INFO ReceiverTracker: ReceiverTracker started > 15/03/29 00:08:30 INFO ForEachDStream: metadataCleanupDelay = -1 > 15/03/29 00:08:30 INFO ShuffledDStream: metadataCleanupDelay = -1 > 15/03/29 00:08:30 INFO MappedDStream: metadataCleanupDelay = -1 > 15/03/29 00:08:30 INFO FlatMappedDStream: metadataCleanupDelay = -1 > 15/03/29 00:08:30 INFO SocketInputDStream: metadataCleanupDelay = -1 > 15/03/29 00:08:30 INFO SocketInputDStream: Slide time = 1000 ms > 15/03/29 00:08:30 INFO SocketInputDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/03/29 00:08:30 INFO SocketInputDStream: Checkpoint interval = null > 15/03/29 00:08:30 INFO SocketInputDStream: Remember duration = 1000 ms > 15/03/29 00:08:30 INFO SocketInputDStream: Initialized and validated > org.apache.spark.streaming.dstream.SocketInputDStream@2b237512 > 15/03/29 00:08:30 INFO FlatMappedDStream: Slide time = 1000 ms > 15/03/29 00:08:30 INFO FlatMappedDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/03/29 00:08:30 INFO FlatMappedDStream: Checkpoint interval = null > 15/03/29 00:08:30 INFO FlatMappedDStream: Remember duration = 1000 ms > 15/03/29 00:08:30 INFO FlatMappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.FlatMappedDStream@279febb9 > 15/03/29 00:08:30 INFO MappedDStream: Slide time = 1000 ms > 15/03/29 00:08:30 INFO MappedDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/29 00:08:30 INFO MappedDStream: Checkpoint interval = null > 15/03/29 00:08:30 INFO MappedDStream: Remember duration = 1000 ms > 15/03/29 00:08:30 INFO MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@2ad1223d > 15/03/29 00:08:30 INFO ShuffledDStream: Slide time = 1000 ms > 15/03/29 00:08:30 INFO ShuffledDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/29 00:08:30 INFO ShuffledDStream: Checkpoint interval = null > 15/03/29 00:08:30 INFO ShuffledDStream: Remember duration = 1000 ms > 15/03/29 00:08:30 INFO ShuffledDStream: Initialized and validated > org.apache.spark.streaming.dstream.ShuffledDStream@81ee8c1 > 15/03/29 00:08:30 INFO ForEachDStream: Slide time = 1000 ms > 15/03/29 00:08:30 INFO ForEachDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/29 00:08:30 INFO ForEachDStream: Checkpoint interval = null > 15/03/29 00:08:30 INFO ForEachDStream: Remember duration = 1000 ms > 15/03/29 00:08:30 INFO ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@2542db11 > 15/03/29 00:08:30 INFO ReceiverTracker: Starting 1 receivers > 15/03/29 00:08:30 INFO RecurringTimer: Started timer for JobGenerator at > time 1427612911000 > 15/03/29 00:08:30 INFO JobGenerator: Started JobGenerator at 1427612911000 > ms > 15/03/29 00:08:30 INFO JobScheduler: Started JobScheduler > 15/03/29 00:08:30 INFO SparkContext: Starting job: start at > SimpleApp.java:94 > 15/03/29 00:08:30 INFO DAGScheduler: Got job 0 (start at SimpleApp.java:94) > with 1 output partitions (allowLocal=false) > 15/03/29 00:08:30 INFO DAGScheduler: Final stage: Stage 0(start at > SimpleApp.java:94) > 15/03/29 00:08:30 INFO DAGScheduler: Parents of final stage: List() > 15/03/29 00:08:30 INFO DAGScheduler: Missing parents: List() > 15/03/29 00:08:30 INFO DAGScheduler: Submitting Stage 0 > (ParallelCollectionRDD[0] at start at SimpleApp.java:94), which has no > missing parents > 15/03/29 00:08:31 INFO JobScheduler: Added jobs for time 1427612911000 ms > 15/03/29 00:08:31 INFO JobScheduler: Starting job streaming job > 1427612911000 ms.0 from job set of time 1427612911000 ms > 15/03/29 00:08:31 INFO SparkContext: Starting job: print at > SimpleApp.java:93 > 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(34736) called with > curMem=0, maxMem=70177259 > 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 33.9 KB, free 66.9 MB) > 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(19382) called with > curMem=34736, maxMem=70177259 > 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_0_piece0 stored as > bytes > in memory (estimated size 18.9 KB, free 66.9 MB) > 15/03/29 00:08:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:61179 (size: 18.9 KB, free: 66.9 MB) > 15/03/29 00:08:31 INFO BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/03/29 00:08:31 INFO SparkContext: Created broadcast 0 from broadcast at > DAGScheduler.scala:838 > 15/03/29 00:08:31 INFO DAGScheduler: Submitting 1 missing tasks from Stage > 0 > (ParallelCollectionRDD[0] at start at SimpleApp.java:94) > 15/03/29 00:08:31 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks > 15/03/29 00:08:31 INFO DAGScheduler: Registering RDD 3 (mapToPair at > SimpleApp.java:80) > 15/03/29 00:08:31 INFO DAGScheduler: Got job 1 (print at SimpleApp.java:93) > with 1 output partitions (allowLocal=true) > 15/03/29 00:08:31 INFO DAGScheduler: Final stage: Stage 2(print at > SimpleApp.java:93) > 15/03/29 00:08:31 INFO DAGScheduler: Parents of final stage: List(Stage 1) > 15/03/29 00:08:31 INFO DAGScheduler: Missing parents: List() > 15/03/29 00:08:31 INFO DAGScheduler: Submitting Stage 2 (ShuffledRDD[4] at > reduceByKey at SimpleApp.java:80), which has no missing parents > 15/03/29 00:08:31 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID > 0, localhost, PROCESS_LOCAL, 1874 bytes) > 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(2248) called with > curMem=54118, maxMem=70177259 > 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 2.2 KB, free 66.9 MB) > 15/03/29 00:08:31 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 15/03/29 00:08:31 INFO MemoryStore: ensureFreeSpace(1664) called with > curMem=56366, maxMem=70177259 > 15/03/29 00:08:31 INFO MemoryStore: Block broadcast_1_piece0 stored as > bytes > in memory (estimated size 1664.0 B, free 66.9 MB) > 15/03/29 00:08:31 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on localhost:61179 (size: 1664.0 B, free: 66.9 MB) > 15/03/29 00:08:31 INFO BlockManagerMaster: Updated info of block > broadcast_1_piece0 > 15/03/29 00:08:31 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:838 > 15/03/29 00:08:31 INFO DAGScheduler: Submitting 1 missing tasks from Stage > 2 > (ShuffledRDD[4] at reduceByKey at SimpleApp.java:80) > 15/03/29 00:08:31 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks > 15/03/29 00:08:31 INFO RecurringTimer: Started timer for BlockGenerator at > time 1427612911600 > 15/03/29 00:08:31 INFO BlockGenerator: Started BlockGenerator > 15/03/29 00:08:31 INFO ReceiverSupervisorImpl: Starting receiver > 15/03/29 00:08:31 INFO ReceiverSupervisorImpl: Called receiver onStart > 15/03/29 00:08:31 INFO BlockGenerator: Started block pushing thread > 15/03/29 00:08:31 INFO SocketReceiver: Connecting to localhost:61112 > 15/03/29 00:08:31 INFO ReceiverTracker: Registered receiver for stream 0 > from akka://sparkDriver > 15/03/29 00:08:31 INFO SocketReceiver: Connected to localhost:61112 > 15/03/29 00:08:32 INFO JobScheduler: Added jobs for time 1427612912000 ms > 15/03/29 00:08:33 INFO JobScheduler: Added jobs for time 1427612913000 ms > 15/03/29 00:08:34 INFO JobScheduler: Added jobs for time 1427612914000 ms > 15/03/29 00:08:35 INFO JobScheduler: Added jobs for time 1427612915000 ms > 15/03/29 00:08:36 INFO JobScheduler: Added jobs for time 1427612916000 ms > 15/03/29 00:08:36 INFO MemoryStore: ensureFreeSpace(9) called with > curMem=58030, maxMem=70177259 > 15/03/29 00:08:36 INFO MemoryStore: Block input-0-1427612916400 stored as > bytes in memory (estimated size 9.0 B, free 66.9 MB) > 15/03/29 00:08:36 INFO BlockManagerInfo: Added input-0-1427612916400 in > memory on localhost:61179 (size: 9.0 B, free: 66.9 MB) > 15/03/29 00:08:36 INFO BlockManagerMaster: Updated info of block > input-0-1427612916400 > 15/03/29 00:08:36 INFO BlockGenerator: Pushed block input-0-1427612916400 > 15/03/29 00:08:37 INFO JobScheduler: Added jobs for time 1427612917000 ms > 15/03/29 00:08:38 INFO JobScheduler: Added jobs for time 1427612918000 ms > 15/03/29 00:08:39 INFO JobScheduler: Added jobs for time 1427612919000 ms > 15/03/29 00:08:40 INFO JobScheduler: Added jobs for time 1427612920000 ms > 15/03/29 00:08:41 INFO JobScheduler: Added jobs for time 1427612921000 ms > 15/03/29 00:08:42 INFO JobScheduler: Added jobs for time 1427612922000 ms > 15/03/29 00:08:43 INFO JobScheduler: Added jobs for time 1427612923000 ms > 15/03/29 00:08:44 INFO JobScheduler: Added jobs for time 1427612924000 ms > 15/03/29 00:08:45 INFO JobScheduler: Added jobs for time 1427612925000 ms > 15/03/29 00:08:46 INFO JobScheduler: Added jobs for time 1427612926000 ms > 15/03/29 00:08:47 INFO JobScheduler: Added jobs for time 1427612927000 ms > > When i enter words on the local Netcat server, the output shows something > like this (in the terminal running this app): > 15/03/29 00:08:36 INFO MemoryStore: ensureFreeSpace(9) called with > curMem=58030, maxMem=70177259 > 15/03/29 00:08:36 INFO MemoryStore: Block input-0-1427612916400 stored as > bytes in memory (estimated size 9.0 B, free 66.9 MB) > 15/03/29 00:08:36 INFO BlockManagerInfo: Added input-0-1427612916400 in > memory on localhost:61179 (size: 9.0 B, free: 66.9 MB) > 15/03/29 00:08:36 INFO BlockManagerMaster: Updated info of block > input-0-1427612916400 > 15/03/29 00:08:36 INFO BlockGenerator: Pushed block input-0-1427612916400 > > But I am not able to see the words that I had actually entered. Could > anyone > please offer any help or advise on what I could do for this? Any help would > be appreciated! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-run-NetworkWordCount-java-tp22274.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
