I am new to spark. I am trying to do the following.
Netcat-->Flume-->Spark streaming(process Flume Data)-->HDFS.

My flume config file has following set up.

Source = netcat
Sink=avrosink.

Spark Streaming code:
I am able to print data from flume to the monitor. But I am struggling to 
create a file. In order to get the real data I need to convert SparkEvent to 
avroEvent.
JavaRDD.saveAsText()-->might not work..because JavaRDD is collection of 
SparkEvent..Do I need to convert this in to collection of JavaRDD<AvroEvent>?
Please share any code examples... Thanks.

Code:

 Duration batchInterval = new Duration(2000);
    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
    JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, 
host, port);

    flumeStream.count();
flumeStream.foreachRDD(new 
Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){
 @Override
 public Void call(JavaRDD<SparkFlumeEvent> events1,JavaRDD<SparkFlumeEvent> 
events2) throws Exception{
events1.saveasTextFile("output.txt");
return null;
 }
 });

    /*flumeStream.count().map(new Function<Long, String>() {
      @Override
      public String call(Long in) {
        return "Received " + in + " flume events.";
      }
    }).print();*/

    flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();


                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;

                     // All the user level data is carried as payload in Flume 
Event

                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord);

                           ??I was trying to write the data to hdfs..but it 
fails...



                     }
                     System.out.println("Processed this batch in: " + 
(System.currentTimeMillis() - t1)/1000 + " seconds");
                     return null;
              }
         });

Reply via email to