I also tried before, but in RawReader.next(key, value) method, invoke reader.next method get an error. it says: Type Mismatch.
On Fri, Mar 18, 2016 at 12:53 AM, Benyi Wang <bewang.t...@gmail.com> wrote: > I would say change > > class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends > FileInputFormat > > to > > class RawDataInputFormat[LongWritable, RDRawDataRecord] extends > FileInputFormat > > > > On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Hi Tony, >> >> Is >> >> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >> >> One of your own packages? >> >> Sounds like it is one throwing the error >> >> HTH >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 17 March 2016 at 15:21, Tony Liu <tony.liu0...@gmail.com> wrote: >> >>> Hi, >>> My HDFS file is store with custom data structures. I want to read it >>> with SparkContext object.So I define a formatting object: >>> >>> *1. code of RawDataInputFormat.scala* >>> >>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >>> import org.apache.hadoop.io.LongWritable >>> import org.apache.hadoop.mapred._ >>> >>> /** >>> * Created by Tony on 3/16/16. >>> */ >>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends >>> FileInputFormat { >>> >>> override def getRecordReader(split: InputSplit, job: JobConf, reporter: >>> Reporter): RecordReader[LW, RD] = { >>> new RawReader(split, job, reporter) >>> } >>> >>> } >>> >>> *2. code of RawReader.scala* >>> >>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord >>> import org.apache.hadoop.io.{LongWritable, SequenceFile} >>> import org.apache.hadoop.mapred._ >>> >>> /** >>> * Created by Tony on 3/17/16. >>> */ >>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends >>> RecordReader[LW, RD] { >>> >>> var reader: SequenceFile.Reader = null >>> var currentPos: Long = 0L >>> var length: Long = 0L >>> >>> def this(split: InputSplit, job: JobConf, reporter: Reporter) { >>> this() >>> val p = (split.asInstanceOf[FileSplit]).getPath >>> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p)) >>> } >>> >>> override def next(key: LW, value: RD): Boolean = { >>> val flag = reader.next(key, value) >>> currentPos = reader.getPosition() >>> flag >>> } >>> >>> override def getProgress: Float = Math.min(1.0f, currentPos / >>> length.toFloat) >>> >>> override def getPos: Long = currentPos >>> >>> override def createKey(): LongWritable = { >>> new LongWritable() >>> } >>> >>> override def close(): Unit = { >>> reader.close() >>> } >>> >>> override def createValue(): RDRawDataRecord = { >>> new RDRawDataRecord() >>> } >>> } >>> >>> *3. code of RDRawDataRecord.scala* >>> >>> import com.kiisoo.aegis.common.rawdata.RawDataRecord; >>> import java.io.DataInput; >>> import java.io.DataOutput; >>> import java.io.IOException; >>> import org.apache.commons.lang.StringUtils; >>> import org.apache.hadoop.io.Writable; >>> >>> public class RDRawDataRecord implements Writable { >>> private String smac; >>> private String dmac; >>> private int hrssi; >>> private int lrssi; >>> private long fstamp; >>> private long lstamp; >>> private long maxstamp; >>> private long minstamp; >>> private long stamp; >>> >>> public void readFields(DataInput in) throws IOException { >>> this.smac = in.readUTF(); >>> this.dmac = in.readUTF(); >>> this.hrssi = in.readInt(); >>> this.lrssi = in.readInt(); >>> this.fstamp = in.readLong(); >>> this.lstamp = in.readLong(); >>> this.maxstamp = in.readLong(); >>> this.minstamp = in.readLong(); >>> this.stamp = in.readLong(); >>> } >>> >>> public void write(DataOutput out) throws IOException { >>> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:""); >>> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:""); >>> out.writeInt(this.hrssi); >>> out.writeInt(this.lrssi); >>> out.writeLong(this.fstamp); >>> out.writeLong(this.lstamp); >>> out.writeLong(this.maxstamp); >>> out.writeLong(this.minstamp); >>> out.writeLong(this.stamp); >>> } >>> >>> */** * >>> >>> *ignore getter setter* >>> >>> * **/* >>> >>> } >>> >>> *At last, I use this code to run*: >>> >>> val filePath = >>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044" >>> val conf = new SparkConf() >>> conf.setMaster("local") >>> conf.setAppName("demo") >>> val sc = new SparkContext(conf) >>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, >>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) >>> file.foreach(v => { >>> println(v._2.getDmac) // Attribute of custom objects >>> }) >>> >>> *I get an error, it says:* >>> >>> Error:(41, 19) type arguments >>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]] >>> conform to the bounds of none of the overloaded alternatives of >>> value hadoopFile: [K, V, F <: >>> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: >>> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit >>> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, >>> F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, >>> minPartitions: Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: >>> scala.reflect.ClassTag[V], implicit fm: >>> scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] >>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, >>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath) >>> ^ >>> >>> >>> >>> *I also try read the text file with SparkContext AIP >>> 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")', >>> It works.* >>> *This error is what does this mean? How to fix this error?* >>> >>> Thank you for help me. >>> >>> -- >>> Tony >>> :) >>> >> >> > -- :)