The only real noise I see is the usage of a MapFunction, which can be
rewritten like this in Scala:
case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).map[Ranking] {
(value: (LongWritable, Text)) = {
val Array(name, n, m) = value._2.toString.split(",")
Ranking(name, n.toInt, m.toInt) // no new needed for case classes
}
})
As you may have noticed, I've also destructured the tuple in the first
line. Another way to do this destructuring in a more concise way is to use
an API extension [1] (which won't be available before 1.1, I suppose).
Since you're parsing textual date, it could also possibly make sense to
handle error conditions for malformed inputs; here is an example that uses
flatMap to do so:
import scala.util.{Try, Success, Failure} // needed to work with the
"functional" Try
case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).flatMap[Ranking] {
(value: (LongWritable, Text), out: Collector[Ranking]) = {
Try {
val Array(name, n, m) = value._2.toString.split(",") // exception
thrown if array size != 3
Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are
not numbers
} match {
case Success(ranking) => ranking
case Failure(exception) => // deal with malformed input, perhaps log
}
}
})
Feel free to ask me for any kind of clarifications on the snippets [2] I
posted, I'll gladly help you further if you need it.
Last note: I'm not a user but I believe Shapeless has some very handy
constructs to move back and forth between tuples and case classes (but
please take this with a grain of salt).
[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions
[2]: I didn't test them, so caution is advisable ;)
On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <[email protected]>
wrote:
> Hi everyone,
>
> first up, I'm new to Scala, so please bear with me, but I could not find
> any solution on the web or the Flink documentation. I'm having trouble
> converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
> class. I got it to work, however in a way that I feel is too verbose for
> Scala:
>
>
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.scala._
>
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.io.Text
>
> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
> val rankingsInput: DataSet[Ranking] =
> env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
> rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
> Ranking] {
> override def map(value: (LongWritable, Text)) = {
> val splits = value._2.toString.split(",")
> new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
> }
> })
>
>
> Is there a simpler way of doing this? All other variants I've tried yield
> some type information errors.
>
> Thanks in advance!
> Robert
>
> --
> My GPG Key ID: 336E2680
>
--
BR,
Stefano Baghino
Software Engineer @ Radicalbit