Hi,
   My HDFS file is store with custom data structures. I want to read it
with SparkContext object.So I define a formatting object:

*1. code of RawDataInputFormat.scala*

import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapred._

/**
  * Created by Tony on 3/16/16.
  */
class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
extends FileInputFormat {

  override def getRecordReader(split: InputSplit, job: JobConf,
reporter: Reporter): RecordReader[LW, RD] = {
    new RawReader(split, job, reporter)
  }

}

*2. code of RawReader.scala*

import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
import org.apache.hadoop.io.{LongWritable, SequenceFile}
import org.apache.hadoop.mapred._

/**
  * Created by Tony on 3/17/16.
  */
class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends
RecordReader[LW, RD] {

  var reader: SequenceFile.Reader = null
  var currentPos: Long = 0L
  var length: Long = 0L

  def this(split: InputSplit, job: JobConf, reporter: Reporter) {
    this()
    val p = (split.asInstanceOf[FileSplit]).getPath
    reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
  }

  override def next(key: LW, value: RD): Boolean = {
    val flag = reader.next(key, value)
    currentPos = reader.getPosition()
    flag
  }

  override def getProgress: Float = Math.min(1.0f, currentPos / length.toFloat)

  override def getPos: Long = currentPos

  override def createKey(): LongWritable = {
    new LongWritable()
  }

  override def close(): Unit = {
    reader.close()
  }

  override def createValue(): RDRawDataRecord = {
    new RDRawDataRecord()
  }
}

*3. code of RDRawDataRecord.scala*

import com.kiisoo.aegis.common.rawdata.RawDataRecord;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Writable;

public class RDRawDataRecord implements Writable {
    private String smac;
    private String dmac;
    private int hrssi;
    private int lrssi;
    private long fstamp;
    private long lstamp;
    private long maxstamp;
    private long minstamp;
    private long stamp;

    public void readFields(DataInput in) throws IOException {
        this.smac = in.readUTF();
        this.dmac = in.readUTF();
        this.hrssi = in.readInt();
        this.lrssi = in.readInt();
        this.fstamp = in.readLong();
        this.lstamp = in.readLong();
        this.maxstamp = in.readLong();
        this.minstamp = in.readLong();
        this.stamp = in.readLong();
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
        out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
        out.writeInt(this.hrssi);
        out.writeInt(this.lrssi);
        out.writeLong(this.fstamp);
        out.writeLong(this.lstamp);
        out.writeLong(this.maxstamp);
        out.writeLong(this.minstamp);
        out.writeLong(this.stamp);
    }

    */** *

        *ignore getter setter*

*    **/*

}

*At last, I use this code to run*:

val filePath = "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("demo")
val sc = new SparkContext(conf)
val file = sc.hadoopFile[LongWritable, RDRawDataRecord,
RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
file.foreach(v => {
  println(v._2.getDmac) // Attribute of custom objects
})

*I get an error, it says:*

Error:(41, 19) type arguments
[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
conform to the bounds of none of the overloaded alternatives of
 value hadoopFile: [K, V, F <:
org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km:
scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V],
implicit fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K,
V)] <and> [K, V, F <: org.apache.hadoop.mapred.InputFormat[K,V]](path:
String, minPartitions: Int)(implicit km: scala.reflect.ClassTag[K],
implicit vm: scala.reflect.ClassTag[V], implicit fm:
scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)]
    val file = sc.hadoopFile[LongWritable, RDRawDataRecord,
RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
                  ^



*I also try read the text file with SparkContext AIP
'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx......")',
It works.*
*This error is what does this mean? How to fix this error?*

Thank you for help me.

--
Tony
:)

Reply via email to