Hi, My HDFS file is store with custom data structures. I want to read it with SparkContext object.So I define a formatting object:
*1. code of RawDataInputFormat.scala* import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord import org.apache.hadoop.io.LongWritable import org.apache.hadoop.mapred._ /** * Created by Tony on 3/16/16. */ class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends FileInputFormat { override def getRecordReader(split: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LW, RD] = { new RawReader(split, job, reporter) } } *2. code of RawReader.scala* import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord import org.apache.hadoop.io.{LongWritable, SequenceFile} import org.apache.hadoop.mapred._ /** * Created by Tony on 3/17/16. */ class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends RecordReader[LW, RD] { var reader: SequenceFile.Reader = null var currentPos: Long = 0L var length: Long = 0L def this(split: InputSplit, job: JobConf, reporter: Reporter) { this() val p = (split.asInstanceOf[FileSplit]).getPath reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p)) } override def next(key: LW, value: RD): Boolean = { val flag = reader.next(key, value) currentPos = reader.getPosition() flag } override def getProgress: Float = Math.min(1.0f, currentPos / length.toFloat) override def getPos: Long = currentPos override def createKey(): LongWritable = { new LongWritable() } override def close(): Unit = { reader.close() } override def createValue(): RDRawDataRecord = { new RDRawDataRecord() } } *3. code of RDRawDataRecord.scala* import com.kiisoo.aegis.common.rawdata.RawDataRecord; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Writable; public class RDRawDataRecord implements Writable { private String smac; private String dmac; private int hrssi; private int lrssi; private long fstamp; private long lstamp; private long maxstamp; private long minstamp; private long stamp; public void readFields(DataInput in) throws IOException { this.smac = in.readUTF(); this.dmac = in.readUTF(); this.hrssi = in.readInt(); this.lrssi = in.readInt(); this.fstamp = in.readLong(); this.lstamp = in.readLong(); this.maxstamp = in.readLong(); this.minstamp = in.readLong(); this.stamp = in.readLong(); } public void write(DataOutput out) throws IOException { out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:""); out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:""); out.writeInt(this.hrssi); out.writeInt(this.lrssi); out.writeLong(this.fstamp); out.writeLong(this.lstamp); out.writeLong(this.maxstamp); out.writeLong(this.minstamp); out.writeLong(this.stamp); } */** * *ignore getter setter* * **/* } *At last, I use this code to run*: val filePath = "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044" val conf = new SparkConf() conf.setMaster("local") conf.setAppName("demo") val sc = new SparkContext(conf) val file = sc.hadoopFile[LongWritable, RDRawDataRecord, RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) file.foreach(v => { println(v._2.getDmac) // Attribute of custom objects }) *I get an error, it says:* Error:(41, 19) type arguments [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]] conform to the bounds of none of the overloaded alternatives of value hadoopFile: [K, V, F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions: Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] val file = sc.hadoopFile[LongWritable, RDRawDataRecord, RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) ^ *I also try read the text file with SparkContext AIP 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")', It works.* *This error is what does this mean? How to fix this error?* Thank you for help me. -- Tony :)