I am new to spark. I am trying to do the following. Netcat-->Flume-->Spark streaming(process Flume Data)-->HDFS.
My flume config file has following set up. Source = netcat Sink=avrosink. Spark Streaming code: I am able to print data from flume to the monitor. But I am struggling to create a file. In order to get the real data I need to convert SparkEvent to avroEvent. JavaRDD.saveAsText()-->might not work..because JavaRDD is collection of SparkEvent..Do I need to convert this in to collection of JavaRDD<AvroEvent>? Please share any code examples... Thanks. Code: Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); flumeStream.count(); flumeStream.foreachRDD(new Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){ @Override public Void call(JavaRDD<SparkFlumeEvent> events1,JavaRDD<SparkFlumeEvent> events2) throws Exception{ events1.saveasTextFile("output.txt"); return null; } }); /*flumeStream.count().map(new Function<Long, String>() { @Override public String call(Long in) { return "Received " + in + " flume events."; } }).print();*/ flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { String logRecord = null; List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(">>>>>>>>LOG RECORD = " + logRecord); ??I was trying to write the data to hdfs..but it fails... } System.out.println("Processed this batch in: " + (System.currentTimeMillis() - t1)/1000 + " seconds"); return null; } });