I am new to spark. I am trying to do the following.
Netcat-->Flume-->Spark streaming(process Flume Data)-->HDFS.
My flume config file has following set up.
Source = netcat
Sink=avrosink.
Spark Streaming code:
I am able to print data from flume to the monitor. But I am struggling to
create a file. In order to get the real data I need to convert SparkEvent to
avroEvent.
JavaRDD.saveAsText()-->might not work..because JavaRDD is collection of
SparkEvent..Do I need to convert this in to collection of JavaRDD<AvroEvent>?
Please share any code examples... Thanks.
Code:
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
batchInterval);
JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc,
host, port);
flumeStream.count();
flumeStream.foreachRDD(new
Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){
@Override
public Void call(JavaRDD<SparkFlumeEvent> events1,JavaRDD<SparkFlumeEvent>
events2) throws Exception{
events1.saveasTextFile("output.txt");
return null;
}
});
/*flumeStream.count().map(new Function<Long, String>() {
@Override
public String call(Long in) {
return "Received " + in + " flume events.";
}
}).print();*/
flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
@Override
public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
Exception {
String logRecord = null;
List<SparkFlumeEvent> events = eventsData.collect();
Iterator<SparkFlumeEvent> batchedEvents =
events.iterator();
long t1 = System.currentTimeMillis();
AvroFlumeEvent avroEvent = null;
ByteBuffer bytePayload = null;
// All the user level data is carried as payload in Flume
Event
while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());
System.out.println(">>>>>>>>LOG RECORD = " +
logRecord);
??I was trying to write the data to hdfs..but it
fails...
}
System.out.println("Processed this batch in: " +
(System.currentTimeMillis() - t1)/1000 + " seconds");
return null;
}
});