Slight update: The following code with "spark context" works, with wild card file paths in hard coded strings but it won't work with a value parsed out of the program arguments as above:
val sc = new SparkContext(sparkConf) val zipFileTextRDD = sc.textFile("/data/raw/logs/2015-09-01/home/logs/access_logs/*/2015-09-01/alog.2015-09-01-18-*.gz", 100) zipFileTextRDD.map { case (text: String) => { text } }.saveAsTextFile("TextFileForGZips") The code with spark "streaming context" won't work with this hard coded path either. val ssc = new StreamingContext(sparkConf, Seconds(2)) val zipFileDStreams = ssc.textFileStream("/data/raw/logs/2015-09-01/home/logs/access_logs/*/2015-09-01/alog.2015-09-01-18-*.gz") zipFileDStreams.foreachRDD { rdd => rdd.map { logLineText => { println(logLineText) logLineText //producerObj.value.send(topics, logLineText) } }.saveAsTextFile("TextFileForGZips") } Not sure if it the DStream created from textFileStream is a problem or something else. But this is something I am not able to explain. Any clarity on what is really happening inside would be helpful in understanding the working so that I don't make such mistake again. Regards, Atul. On Fri, Sep 11, 2015 at 11:32 AM, Atul Kulkarni <atulskulka...@gmail.com> wrote: > Folks, > > Any help on this? > > Regards, > Atul. > > > On Fri, Sep 11, 2015 at 8:39 AM, Atul Kulkarni <atulskulka...@gmail.com> > wrote: > >> Hi Raghavendra, >> >> Thanks for your answers, I am passing 10 executors and I am not sure if >> that is the problem. It is still hung. >> >> Regards, >> Atul. >> >> >> On Fri, Sep 11, 2015 at 12:40 AM, Raghavendra Pandey < >> raghavendra.pan...@gmail.com> wrote: >> >>> You can pass the number of executors via command line option >>> --num-executors.You need more than 2 executors to make spark-streaming >>> working. >>> >>> For more details on command line option, please go through >>> http://spark.apache.org/docs/latest/running-on-yarn.html. >>> >>> >>> On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni <atulskulka...@gmail.com >>> > wrote: >>> >>>> I am submitting the job with yarn-cluster mode. >>>> >>>> spark-submit --master yarn-cluster ... >>>> >>>> On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey < >>>> raghavendra.pan...@gmail.com> wrote: >>>> >>>>> What is the value of spark master conf.. By default it is local, that >>>>> means only one thread can run and that is why your job is stuck. >>>>> Specify it local[*], to make thread pool equal to number of cores... >>>>> >>>>> Raghav >>>>> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Folks, >>>>>> >>>>>> Below is the code have for Spark based Kafka Producer to take >>>>>> advantage of multiple executors reading files in parallel on my cluster >>>>>> but >>>>>> I am stuck at The program not making any progress. >>>>>> >>>>>> Below is my scrubbed code: >>>>>> >>>>>> val sparkConf = new SparkConf().setAppName(applicationName) >>>>>> val ssc = new StreamingContext(sparkConf, Seconds(2)) >>>>>> >>>>>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties)) >>>>>> >>>>>> val zipFileDStreams = ssc.textFileStream(inputFiles) >>>>>> zipFileDStreams.foreachRDD { >>>>>> rdd => >>>>>> rdd.foreachPartition( >>>>>> partition => { >>>>>> partition.foreach{ >>>>>> case (logLineText) => >>>>>> println(logLineText) >>>>>> producerObj.value.send(topics, logLineText) >>>>>> } >>>>>> } >>>>>> ) >>>>>> } >>>>>> >>>>>> ssc.start() >>>>>> ssc.awaitTermination() >>>>>> >>>>>> ssc.stop() >>>>>> >>>>>> The code for KafkaSink is as follows. >>>>>> >>>>>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], >>>>>> Array[Byte]]) extends Serializable { >>>>>> >>>>>> lazy val producer = createProducer() >>>>>> val logParser = new LogParser() >>>>>> >>>>>> def send(topic: String, value: String): Unit = { >>>>>> >>>>>> val logLineBytes = >>>>>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString) >>>>>> producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, >>>>>> logLineBytes)) >>>>>> } >>>>>> } >>>>>> >>>>>> object KafkaSink { >>>>>> def apply(config: Properties): KafkaSink = { >>>>>> >>>>>> val f = () => { >>>>>> val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, >>>>>> null, null) >>>>>> >>>>>> sys.addShutdownHook { >>>>>> producer.close() >>>>>> } >>>>>> producer >>>>>> } >>>>>> >>>>>> new KafkaSink(f) >>>>>> } >>>>>> } >>>>>> >>>>>> Disclaimer: it is based on the code inspired by >>>>>> http://allegro.tech/spark-kafka-integration.html. >>>>>> >>>>>> The job just sits there I cannot see any Job Stages being created. >>>>>> Something I want to mention - I I am trying to read gzipped files from >>>>>> HDFS >>>>>> - could it be that Streaming context is not able to read *.gz files? >>>>>> >>>>>> >>>>>> I am not sure what more details I can provide to help explain my >>>>>> problem. >>>>>> >>>>>> >>>>>> -- >>>>>> Regards, >>>>>> Atul Kulkarni >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Atul Kulkarni >>>> >>> >>> >> >> >> -- >> Regards, >> Atul Kulkarni >> > > > > -- > Regards, > Atul Kulkarni > -- Regards, Atul Kulkarni