Now i have a better version, but now the problem is that the saveAsTextFile
do not finish the Job, in the hdfs repository only exist a partial
temporary file, someone can tell me what is wrong:
Thanks !!
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Csv Clipper")
val sc = new SparkContext(conf)
val csvPath =
"hdfs://m01/user/acoronado/mov/movilidad_64mb.csv"
val csv = sc.textFile(csvPath)
csv.cache()
val clipPoints = csv.map({line: String =>
val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)
val punto =
Point(lon.toDouble,lat.toDouble)
val internal =
geoDataExternal.get.find(f => f.geometry intersects punto)
val (cve_est, cve_mun) =
internal match {
case
Some(f:org.geoscript.feature.Feature) => {
val index =
f.getAttribute(1).toString()
val existe =
geoDataMun.get(index).find(f => f.geometry intersects punto)
existe match {
case Some(f) => (f.getAttribute(1).toString, f.getAttribute(2).toString)
case None => ("0", "0")
}
}
case None => ("0", "0")
}
val time = try {(new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()} catch {case e: Exception => "0"}
line+","+time+","+cve_est+","+cve_mun
})
clipPoints.*saveAsTextFile*
("hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv")
println("Spark Clip Exito!!!")
}
object geoDataMun {
private val shp = Shapefile("/geoData/MunicipiosLatLon.shp")
val features = shp.getFeatures.toIterable
val result = scala.io.Source.fromFile("/geoData/indice_espacial.csv")
.getLines()
.toList map { line: String =>
val campos =
line.split(",").map(_.trim)
val cve_edo = campos(0)
val cve_mun = campos(1)
val index = campos(2)
scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun))
}
val mapaIx = result.groupBy(x=>x(0)).mapValues(cves => cves.map(x =>
x(1)))
def get(index:String) = {
features.filter(f =>
mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString)))
}
}
object geoDataExternal{
private val shp = Shapefile("/geoData/IndiceRecortado.shp")
val features = shp.getFeatures
def get:
FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
= features
}
}
the log of the driver is:
14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
[email protected]:44895] -> [akka.tcp://
[email protected]:43942]: Error
[Association failed with [akka.tcp://
[email protected]:43942]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://[email protected]:43942]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
]
14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
[email protected]:44895] -> [akka.tcp://
[email protected]:43942]: Error
[Association failed with [akka.tcp://
[email protected]:43942]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://[email protected]:43942]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
]
14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
[email protected]:44895] -> [akka.tcp://
[email protected]:43942]: Error
[Association failed with [akka.tcp://
[email protected]:43942]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://[email protected]:43942]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
On Mon, Sep 15, 2014 at 1:30 PM, Abel Coronado Iruegas <
[email protected]> wrote:
> Here an example of a working code that takes a csv with lat lon points and
> intersects with polygons of municipalities of Mexico, generating a new
> version of the file with new attributes.
>
> Do you think that could be improved?
>
> Thanks.
>
> The Code:
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import org.geoscript.feature._
> import org.geoscript.geometry._
> import org.geoscript.geometry.builder._
> import com.vividsolutions.jts._
> import org.geoscript.layer.Shapefile
> import org.geotools.feature.FeatureCollection
> import java.text._
> import java.util._
>
> object SimpleApp {
> def main(args: Array[String]){
> val conf = new SparkConf().setAppName("Csv Clipper")
> val sc = new SparkContext(conf)
> val csvPath =
> "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows
> val csv = sc.textFile(csvPath)
> val clipPoints = csv.map({line: String =>
> val Array(usuario, lat,
> lon, date) = line.split(",").map(_.trim)
> val punto =
> Point(lon.toDouble,lat.toDouble)
> val existe =
> geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
> var cve_est = "0"
> var cve_mun = "0"
> var time = "0"
> if(!existe.isEmpty){
> val f = existe.take(1)
> val ff = f.toList(0)
> cve_est =
> ff.getAttribute(1).toString //State Code
> cve_mun =
> ff.getAttribute(2).toString // Municipality Code
> time = (new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
> "+0000")).getTime().toString()
> }
>
> line+","+time+","+cve_est+","+cve_mun
> })
>
> clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
> println("Spark Clip Exito!!!")
> }
> object geoData {
> private val estatal =
> Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all
> the nodes.
> private val estatalColl = estatal.getFeatures
> def
> get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
> = estatalColl
> }
> }
>