package spark.pcap.run;

import java.util.Arrays;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;
import spark.pcap.io.PcapReceiver;

final class ClosePcapSpark extends Thread
{

	private JavaStreamingContext	jssc;

	public ClosePcapSpark(JavaStreamingContext jssc)
	{
		this.jssc = jssc;
	}

	public void run()
	{
		this.jssc.stop();
		System.out.println("spark streaming service closed...");
	}

}

public class TestPcapSpark
{

	private static final Pattern	LINE		= Pattern.compile("\n");

	private static final int		DURATION	= 30;

	public static void process()
	{
		SparkConf conf = new SparkConf().setAppName("PacketProcessing");

		JavaStreamingContext jssc = new JavaStreamingContext(conf,
				Durations.seconds(DURATION));

		JavaDStream<String> lines = jssc.receiverStream(new PcapReceiver());

		JavaDStream<String> flows = lines
				.flatMap(new FlatMapFunction<String, String>()
				{
					private static final long	serialVersionUID	= -8861675761992411194L;

					public Iterable<String> call(String line) throws Exception
					{
						System.out.println("--------------------------------");
						System.out.println(line);
						System.out.println("--------------------------------");

						return Arrays.asList(LINE.split(line));
					}
				});

		JavaPairDStream<String, Integer> flowsWithID = flows
				.mapToPair(new PairFunction<String, String, Integer>()
				{
					private static final long	serialVersionUID	= -562094525045614076L;

					public Tuple2<String, Integer> call(String flow)
							throws Exception
					{
						return new Tuple2<String, Integer>(flow, 1);
					}
				});

		JavaPairDStream<String, Integer> flowCounts = flowsWithID
				.reduceByKey(new Function2<Integer, Integer, Integer>()
				{
					private static final long	serialVersionUID	= -213226948395801204L;

					public Integer call(Integer f1, Integer f2)
							throws Exception
					{
						return f1 + f2;
					}
				});

		flowCounts.print();

		// Stop streaming service when ctrl+c
		Runtime.getRuntime().addShutdownHook(new ClosePcapSpark(jssc));

		jssc.start();
		jssc.awaitTermination();
	}

	public static void main(String[] args) throws Exception
	{
		TestPcapSpark.process();
	}

}
