I also tried before, but in RawReader.next(key, value) method, invoke
reader.next method get an error. it says: Type Mismatch.

On Fri, Mar 18, 2016 at 12:53 AM, Benyi Wang <bewang.t...@gmail.com> wrote:

> I would say change
>
> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
> FileInputFormat
>
> to
>
> class RawDataInputFormat[LongWritable, RDRawDataRecord] extends 
> FileInputFormat
>
> ​
>
> On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Tony,
>>
>> Is
>>
>> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>
>> One of your own packages?
>>
>> Sounds like it is one throwing the error
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 March 2016 at 15:21, Tony Liu <tony.liu0...@gmail.com> wrote:
>>
>>> Hi,
>>>    My HDFS file is store with custom data structures. I want to read it
>>> with SparkContext object.So I define a formatting object:
>>>
>>> *1. code of RawDataInputFormat.scala*
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.LongWritable
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/16/16.
>>>   */
>>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>>> FileInputFormat {
>>>
>>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
>>> Reporter): RecordReader[LW, RD] = {
>>>     new RawReader(split, job, reporter)
>>>   }
>>>
>>> }
>>>
>>> *2. code of RawReader.scala*
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/17/16.
>>>   */
>>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>>> RecordReader[LW, RD] {
>>>
>>>   var reader: SequenceFile.Reader = null
>>>   var currentPos: Long = 0L
>>>   var length: Long = 0L
>>>
>>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>>>     this()
>>>     val p = (split.asInstanceOf[FileSplit]).getPath
>>>     reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>>   }
>>>
>>>   override def next(key: LW, value: RD): Boolean = {
>>>     val flag = reader.next(key, value)
>>>     currentPos = reader.getPosition()
>>>     flag
>>>   }
>>>
>>>   override def getProgress: Float = Math.min(1.0f, currentPos / 
>>> length.toFloat)
>>>
>>>   override def getPos: Long = currentPos
>>>
>>>   override def createKey(): LongWritable = {
>>>     new LongWritable()
>>>   }
>>>
>>>   override def close(): Unit = {
>>>     reader.close()
>>>   }
>>>
>>>   override def createValue(): RDRawDataRecord = {
>>>     new RDRawDataRecord()
>>>   }
>>> }
>>>
>>> *3. code of RDRawDataRecord.scala*
>>>
>>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>>> import java.io.DataInput;
>>> import java.io.DataOutput;
>>> import java.io.IOException;
>>> import org.apache.commons.lang.StringUtils;
>>> import org.apache.hadoop.io.Writable;
>>>
>>> public class RDRawDataRecord implements Writable {
>>>     private String smac;
>>>     private String dmac;
>>>     private int hrssi;
>>>     private int lrssi;
>>>     private long fstamp;
>>>     private long lstamp;
>>>     private long maxstamp;
>>>     private long minstamp;
>>>     private long stamp;
>>>
>>>     public void readFields(DataInput in) throws IOException {
>>>         this.smac = in.readUTF();
>>>         this.dmac = in.readUTF();
>>>         this.hrssi = in.readInt();
>>>         this.lrssi = in.readInt();
>>>         this.fstamp = in.readLong();
>>>         this.lstamp = in.readLong();
>>>         this.maxstamp = in.readLong();
>>>         this.minstamp = in.readLong();
>>>         this.stamp = in.readLong();
>>>     }
>>>
>>>     public void write(DataOutput out) throws IOException {
>>>         out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>>>         out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>>>         out.writeInt(this.hrssi);
>>>         out.writeInt(this.lrssi);
>>>         out.writeLong(this.fstamp);
>>>         out.writeLong(this.lstamp);
>>>         out.writeLong(this.maxstamp);
>>>         out.writeLong(this.minstamp);
>>>         out.writeLong(this.stamp);
>>>     }
>>>
>>>     */** *
>>>
>>>         *ignore getter setter*
>>>
>>> *    **/*
>>>
>>> }
>>>
>>> *At last, I use this code to run*:
>>>
>>> val filePath = 
>>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>>> val conf = new SparkConf()
>>> conf.setMaster("local")
>>> conf.setAppName("demo")
>>> val sc = new SparkContext(conf)
>>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>>> file.foreach(v => {
>>>   println(v._2.getDmac) // Attribute of custom objects
>>> })
>>>
>>> *I get an error, it says:*
>>>
>>> Error:(41, 19) type arguments 
>>> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>>>  conform to the bounds of none of the overloaded alternatives of
>>>  value hadoopFile: [K, V, F <: 
>>> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: 
>>> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit 
>>> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)] <and> [K, V, 
>>> F <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, 
>>> minPartitions: Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: 
>>> scala.reflect.ClassTag[V], implicit fm: 
>>> scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)]
>>>     val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>>>                   ^
>>>
>>>
>>>
>>> *I also try read the text file with SparkContext AIP
>>> 'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")',
>>> It works.*
>>> *This error is what does this mean? How to fix this error?*
>>>
>>> Thank you for help me.
>>>
>>> --
>>> Tony
>>> :)
>>>
>>
>>
>


-- 
:)

Reply via email to