Hi Evan, here a improved version, thanks for your advice. But you know the last step, the SaveAsTextFile is very Sloooow, :(
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import java.net.URL import java.text.SimpleDateFormat import com.vividsolutions.jts.geom._ import com.vividsolutions.jts.index.strtree.STRtree import com.vividsolutions.jts.io._ import org.geotools.data.FileDataStoreFinder import org.geotools.geometry.jts.{JTSFactoryFinder, ReferencedEnvelope} import org.opengis.feature.{simple, Feature, FeatureVisitor} import scala.collection.JavaConverters._ object SimpleApp { def main(args: Array[String]){ val conf = new SparkConf().setAppName("Csv Clipper") val sc = new SparkContext(conf) val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv" val csv = sc.textFile(csvPath) //csv.coalesce(60,true) csv.cache() val clipPoints = csv.map({line: String => val Array(usuario, lat, lon, date) = line.split(",").map(_.trim) val geometryFactory = JTSFactoryFinder.getGeometryFactory(); val reader = new WKTReader(geometryFactory); val point = reader.read("POINT ("+lon+" "+ lat + ")" ) val envelope = point.getEnvelopeInternal val internal = geoDataMun.get(envelope) val (cve_est, cve_mun) = internal match { case l => { val existe = l.find( f => f match { case (g:Geometry,e:String,m:String) => g.intersects(point) case _ => false} ) existe match { case Some(t) => t match { case (g:Geometry,e:String,m:String) => (e,m) case _ => ("0","0")} case None => ("0", "0") } } case _ => ("0", "0") } val time = try {(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", "+0000")).getTime().toString()} catch {case e: Exception => "0"} line+","+time+","+cve_est+","+cve_mun }) clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_fast.csv") } object geoDataMun { var spatialIndex = new STRtree() val path = new URL("file:////geoData/MunicipiosLatLon.shp") val store = FileDataStoreFinder.getDataStore(path) val source = store.getFeatureSource(); val features = source.getFeatures(); val it = features.features(); while(it.hasNext){ val feature = it.next() val geom = feature.getDefaultGeometry if (geom != null) { val geomClass = geom match { case g2: Geometry => g2 case _ => throw new ClassCastException } val env = geomClass.getEnvelopeInternal(); if (!env.isNull) { spatialIndex.insert(env, (geomClass,feature.getAttribute(1),feature.getAttribute(2))); } } } def get(env:Envelope) = spatialIndex.query(env).asScala } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14710.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org