I realize I forgot the sbt part
resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0",
"com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
)
otherwise, to process streaming log I use logstash with kafka as input.
You can set kafka as output if you need to do some extra calculation
with spark.
Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi,
I use maxmind geoip with spark (no streaming). To make it work you
should use mapPartition. I don't know if something similar exist for
spark streaming.
my code for reference:
def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude =
(lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude =
(lookupResult._1).map(_.longitude).getOrElse(None).toString
return List(countryName, city, latitude, longitude)
}
sc.addFile("/home/your_user/GeoLiteCity.dat")
//load your data in my_data rdd
my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile =
Some(SparkFiles.get("GeoLiteCity.dat")))
rows.map { row => row ::: parseIP(row(3),ipLookups) }
}
Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,
Currently, I sent nginx log to Kafka then I want to use Spark
Streaming to parse the log and enrich the IP info with geoip libs
from Maxmind.
I found this one
https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark
streaming throw error and told that the lib was not Serializable.
Does anyone there way to process the IP info in Spark Streaming? Many
thanks.