Hi, I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it.
> On Feb 26, 2016, at 2:47 PM, Zhun Shen <shenzhunal...@gmail.com> wrote: > > Hi, > > thanks for you advice. I tried your method, I use Gradle to manage my scala > code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in > Gradle. > > spark version: 1.6.0 > scala: 2.10.4 > scala-maxmind-iplookups: 0.2.0 > > I run my test, got the error as below: > java.lang.NoClassDefFoundError: > scala/collection/JavaConversions$JMapWrapperLike > at > com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53) > > > > >> On Feb 24, 2016, at 1:10 AM, romain sagean <romain.sag...@hupi.fr >> <mailto:romain.sag...@hupi.fr>> wrote: >> >> I realize I forgot the sbt part >> >> resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/" >> <http://maven.snplow.com/releases/> >> >> libraryDependencies ++= Seq( >> "org.apache.spark" %% "spark-core" % "1.3.0", >> "com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0" >> ) >> >> otherwise, to process streaming log I use logstash with kafka as input. You >> can set kafka as output if you need to do some extra calculation with spark. >> >> Le 23/02/2016 15:07, Romain Sagean a écrit : >>> Hi, >>> I use maxmind geoip with spark (no streaming). To make it work you should >>> use mapPartition. I don't know if something similar exist for spark >>> streaming. >>> >>> my code for reference: >>> >>> def parseIP(ip:String, ipLookups: IpLookups): List[String] = { >>> val lookupResult = ipLookups.performLookups(ip) >>> val countryName = (lookupResult._1).map(_.countryName).getOrElse("") >>> val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("") >>> val latitude = >>> (lookupResult._1).map(_.latitude).getOrElse(None).toString >>> val longitude = >>> (lookupResult._1).map(_.longitude).getOrElse(None).toString >>> return List(countryName, city, latitude, longitude) >>> } >>> sc.addFile("/home/your_user/GeoLiteCity.dat") >>> >>> //load your data in my_data rdd >>> >>> my_data.mapPartitions { rows => >>> val ipLookups = IpLookups(geoFile = >>> Some(SparkFiles.get("GeoLiteCity.dat"))) >>> rows.map { row => row ::: parseIP(row(3),ipLookups) } >>> } >>> >>> Le 23/02/2016 14:28, Zhun Shen a écrit : >>>> Hi all, >>>> >>>> Currently, I sent nginx log to Kafka then I want to use Spark Streaming to >>>> parse the log and enrich the IP info with geoip libs from Maxmind. >>>> >>>> I found this one >>>> <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git >>>> <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>, but spark >>>> streaming throw error and told that the lib was not Serializable. >>>> >>>> Does anyone there way to process the IP info in Spark Streaming? Many >>>> thanks. >>>> >>> >> >