Hi everyone,

first up, I'm new to Scala, so please bear with me, but I could not find
any solution on the web or the Flink documentation. I'm having trouble
converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
class. I got it to work, however in a way that I feel is too verbose for
Scala:


import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
Ranking] {
  override def map(value: (LongWritable, Text)) = {
      val splits = value._2.toString.split(",")
      new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
    }
  })


Is there a simpler way of doing this? All other variants I've tried yield
some type information errors.

Thanks in advance!
Robert

-- 
My GPG Key ID: 336E2680

Reply via email to