Hi Evan,
here a improved version, thanks for your advice. But you know the last step,
the SaveAsTextFile is very Sloooow, :(
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import java.net.URL
import java.text.SimpleDateFormat
import com.vividsolutions.jts.geom._
import com.vividsolutions.jts.index.strtree.STRtree
import com.vividsolutions.jts.io._
import org.geotools.data.FileDataStoreFinder
import org.geotools.geometry.jts.{JTSFactoryFinder, ReferencedEnvelope}
import org.opengis.feature.{simple, Feature, FeatureVisitor}
import scala.collection.JavaConverters._
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Csv Clipper")
val sc = new SparkContext(conf)
val csvPath = "hdfs://m01/user/acoronado/mov/movilidad.csv"
val csv = sc.textFile(csvPath)
//csv.coalesce(60,true)
csv.cache()
val clipPoints = csv.map({line: String =>
val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)
val geometryFactory =
JTSFactoryFinder.getGeometryFactory();
val reader = new
WKTReader(geometryFactory);
val point = reader.read("POINT
("+lon+" "+ lat + ")" )
val envelope =
point.getEnvelopeInternal
val internal =
geoDataMun.get(envelope)
val (cve_est, cve_mun) =
internal match {
case l => {
val existe
= l.find( f => f match { case (g:Geometry,e:String,m:String) =>
g.intersects(point) case _ => false} )
existe
match {
case Some(t) => t match { case (g:Geometry,e:String,m:String) => (e,m) case
_ => ("0","0")}
case None => ("0", "0")
}
}
case _ => ("0", "0")
}
val time = try {(new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()} catch {case e: Exception => "0"}
line+","+time+","+cve_est+","+cve_mun
})
clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_fast.csv")
}
object geoDataMun {
var spatialIndex = new STRtree()
val path = new
URL("file:////geoData/MunicipiosLatLon.shp")
val store = FileDataStoreFinder.getDataStore(path)
val source = store.getFeatureSource();
val features = source.getFeatures();
val it = features.features();
while(it.hasNext){
val feature = it.next()
val geom = feature.getDefaultGeometry
if (geom != null) {
val geomClass = geom match { case g2: Geometry
=> g2 case _ => throw new ClassCastException }
val env = geomClass.getEnvelopeInternal();
if (!env.isNull) {
spatialIndex.insert(env,
(geomClass,feature.getAttribute(1),feature.getAttribute(2)));
}
}
}
def get(env:Envelope) =
spatialIndex.query(env).asScala
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Example-of-Geoprocessing-with-Spark-tp14274p14710.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]