I realize I forgot the sbt part

resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/";

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
)

otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.

Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi,
I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
    val lookupResult = ipLookups.performLookups(ip)
    val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
    val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
    return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
        rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind.

I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many thanks.



Reply via email to