Hi everyone, I'm trying to run following from zeppelin:
```
case class Movie(movieId: Long, title: String, genres: String)
case class MovieWithGenresAndYear(movieId: Long, title: String, genres:
List[String], year: Integer)
case class MovieExploded(movieId: Long, title: String, genres: List[String])
case class MovieAggregate(year: Int, count: Long)
import spark.implicits._
val df = spark
.read
.orc("/home/finkel/Downloads/ml-latest/movies.csv")
.as[Movie]
.map(it => MovieExploded(it.movieId, it.title,
it.genres.split('|').map(_.trim).toList))
.map {
case MovieExploded(movieId, title, genres) =>
if (!title.matches("\"?.*\\(\\d{4}\\)\\s*\"?"))
MovieWithGenresAndYear(movieId, title, genres, null)
else {
val lastOpen = title.lastIndexOf('(')
val year = title.substring(lastOpen + 1).replace(")",
"").replace("\"", "").trim.toInt
MovieWithGenresAndYear(movieId, title.substring(0,
lastOpen), genres, year)
}
}
.filter(_.year != null)
.groupByKey(_.year)
.mapGroups((k, v) => MovieAggregate(k, v.size))
.show(300, false)
```
movies.csv is from movielens latest dataset
And this query hangs forever (I've tried to wait for an hour).
I can reproduce this in both 0.9 and 0.2
The very same query runs in spark shell momentarily
Any ideas on potential causes?
--
Regards,
Pasha
Big Data Tools @ JetBrains
signature.asc
Description: PGP signature
