import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.clustering.StreamingKMeans;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class StreamingKmeans {

	static SparkConf conf;
	static JavaStreamingContext scc;

	private static class ParsePoint implements Function<String, Vector> {
		private static final long serialVersionUID = 1L;
		private static final Pattern COMMA = Pattern.compile(",");
		@Override
		public Vector call(String line) {
			String[] tok = COMMA.split(line);
			double[] point = new double[tok.length];
			for (int i = 0; i < tok.length; ++i) {
				point[i] = Double.parseDouble(tok[i]);
			}
			return Vectors.dense(point);
		}
	}
	private static class LabeledParsePoint implements Function<String, LabeledPoint> {
		private static final long serialVersionUID = 1L;
		private static final Pattern COMMA = Pattern.compile(",");
		@Override
		public LabeledPoint call(String line) {
			String[] tok = COMMA.split(line);
			double[] point = new double[tok.length];
			for (int i = 1; i < tok.length; ++i) {
				point[i] = Double.parseDouble(tok[i]);
			}
			LabeledPoint lbpoint=new LabeledPoint(Double.parseDouble(tok[0]),Vectors.dense(point));
			return lbpoint;
		}
	}
	public static void main(String args[]){
		System.setProperty("hadoop.home.dir", "c:\\winutil\\");
		conf = new SparkConf().setAppName("appName").setMaster("local");
		scc =  new JavaStreamingContext(conf, Durations.seconds(5));
		JavaDStream<String>  lines = scc.textFileStream("./src/main/resources/CustomerDemography.csv");
		JavaDStream<Vector>  traingData= lines.map(new ParsePoint());
		JavaDStream<String>  testLines = scc.textFileStream("./src/main/resources/testdata.csv");
		JavaDStream<LabeledPoint>  testData= testLines.map(new LabeledParsePoint());
		StreamingKMeans model =new StreamingKMeans().setK(3).setDecayFactor(1.0).setRandomCenters(4, 2.0,4l);
 		model.trainOn(traingData)	;	
 		JavaPairDStream<Double, Vector> predcitionSet = testData.mapToPair(new PairFunction<LabeledPoint, Double, Vector>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Tuple2<Double, Vector> call(LabeledPoint lpoint) throws Exception {
				return new Tuple2<Double, Vector>(lpoint.label(),lpoint.features());
			}
		});
 		model.predictOnValues(predcitionSet).print();
 		scc.start();
 		scc.awaitTermination();
	}
}