Slight update:

The following code with "spark context" works, with wild card file paths in
hard coded strings but it won't work with a value parsed out of the program
arguments as above:

val sc = new SparkContext(sparkConf)
val zipFileTextRDD =
sc.textFile("/data/raw/logs/2015-09-01/home/logs/access_logs/*/2015-09-01/alog.2015-09-01-18-*.gz",
100)

zipFileTextRDD.map {
  case (text: String) => {
    text
  }
}.saveAsTextFile("TextFileForGZips")

The code with spark "streaming context" won't work with this hard coded
path either.

val ssc = new StreamingContext(sparkConf, Seconds(2))

val zipFileDStreams =
ssc.textFileStream("/data/raw/logs/2015-09-01/home/logs/access_logs/*/2015-09-01/alog.2015-09-01-18-*.gz")
zipFileDStreams.foreachRDD {
  rdd =>
    rdd.map {
      logLineText => {
          println(logLineText)
          logLineText
          //producerObj.value.send(topics, logLineText)
      }
    }.saveAsTextFile("TextFileForGZips")
}

Not sure if it the DStream created from textFileStream is a problem or
something else.

But this is something I am not able to explain. Any clarity on what is
really happening inside would be helpful in understanding the working so
that I don't make such mistake again.

Regards,
Atul.



On Fri, Sep 11, 2015 at 11:32 AM, Atul Kulkarni <atulskulka...@gmail.com>
wrote:

> Folks,
>
> Any help on this?
>
> Regards,
> Atul.
>
>
> On Fri, Sep 11, 2015 at 8:39 AM, Atul Kulkarni <atulskulka...@gmail.com>
> wrote:
>
>> Hi Raghavendra,
>>
>> Thanks for your answers, I am passing 10 executors and I am not sure if
>> that is the problem. It is still hung.
>>
>> Regards,
>> Atul.
>>
>>
>> On Fri, Sep 11, 2015 at 12:40 AM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> You can pass the number of executors via command line option
>>> --num-executors.You need more than 2 executors to make spark-streaming
>>> working.
>>>
>>> For more details on command line option, please go through
>>> http://spark.apache.org/docs/latest/running-on-yarn.html.
>>>
>>>
>>> On Fri, Sep 11, 2015 at 10:52 AM, Atul Kulkarni <atulskulka...@gmail.com
>>> > wrote:
>>>
>>>> I am submitting the job with yarn-cluster mode.
>>>>
>>>> spark-submit --master yarn-cluster ...
>>>>
>>>> On Thu, Sep 10, 2015 at 7:50 PM, Raghavendra Pandey <
>>>> raghavendra.pan...@gmail.com> wrote:
>>>>
>>>>> What is the value of spark master conf.. By default it is local, that
>>>>> means only one thread can run and that is why your job is stuck.
>>>>> Specify it local[*], to make thread pool equal to number of cores...
>>>>>
>>>>> Raghav
>>>>> On Sep 11, 2015 6:06 AM, "Atul Kulkarni" <atulskulka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Folks,
>>>>>>
>>>>>> Below is the code  have for Spark based Kafka Producer to take
>>>>>> advantage of multiple executors reading files in parallel on my cluster 
>>>>>> but
>>>>>> I am stuck at The program not making any progress.
>>>>>>
>>>>>> Below is my scrubbed code:
>>>>>>
>>>>>> val sparkConf = new SparkConf().setAppName(applicationName)
>>>>>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>>>>>>
>>>>>> val producerObj = ssc.sparkContext.broadcast(KafkaSink(kafkaProperties))
>>>>>>
>>>>>> val zipFileDStreams = ssc.textFileStream(inputFiles)
>>>>>> zipFileDStreams.foreachRDD {
>>>>>>   rdd =>
>>>>>>     rdd.foreachPartition(
>>>>>>       partition => {
>>>>>>         partition.foreach{
>>>>>>           case (logLineText) =>
>>>>>>             println(logLineText)
>>>>>>             producerObj.value.send(topics, logLineText)
>>>>>>         }
>>>>>>       }
>>>>>>     )
>>>>>> }
>>>>>>
>>>>>> ssc.start()
>>>>>> ssc.awaitTermination()
>>>>>>
>>>>>> ssc.stop()
>>>>>>
>>>>>> The code for KafkaSink is as follows.
>>>>>>
>>>>>> class KafkaSink(createProducer: () => KafkaProducer[Array[Byte], 
>>>>>> Array[Byte]]) extends Serializable {
>>>>>>
>>>>>>   lazy val producer = createProducer()
>>>>>>   val logParser = new LogParser()
>>>>>>
>>>>>>   def send(topic: String, value: String): Unit = {
>>>>>>
>>>>>>     val logLineBytes = 
>>>>>> Bytes.toBytes(logParser.avroEvent(value.split("\t")).toString)
>>>>>>     producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
>>>>>> logLineBytes))
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> object KafkaSink {
>>>>>>   def apply(config: Properties): KafkaSink = {
>>>>>>
>>>>>>     val f = () => {
>>>>>>       val producer = new KafkaProducer[Array[Byte], Array[Byte]](config, 
>>>>>> null, null)
>>>>>>
>>>>>>       sys.addShutdownHook {
>>>>>>         producer.close()
>>>>>>       }
>>>>>>       producer
>>>>>>     }
>>>>>>
>>>>>>     new KafkaSink(f)
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Disclaimer: it is based on the code inspired by
>>>>>> http://allegro.tech/spark-kafka-integration.html.
>>>>>>
>>>>>> The job just sits there I cannot see any Job Stages being created.
>>>>>> Something I want to mention - I I am trying to read gzipped files from 
>>>>>> HDFS
>>>>>> - could it be that Streaming context is not able to read *.gz files?
>>>>>>
>>>>>>
>>>>>> I am not sure what more details I can provide to help explain my
>>>>>> problem.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Regards,
>>>>>> Atul Kulkarni
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Atul Kulkarni
>>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Atul Kulkarni
>>
>
>
>
> --
> Regards,
> Atul Kulkarni
>



-- 
Regards,
Atul Kulkarni

Reply via email to