Mind sharing the error you are getting? On 28 Oct 2015 03:53, "Balachandar R.A." <balachandar...@gmail.com> wrote:
> Hello, > > > I have developed a hadoop based solution that process a binary file. This > uses classic hadoop MR technique. The binary file is about 10GB and divided > into 73 HDFS blocks, and the business logic written as map process operates > on each of these 73 blocks. We have developed a customInputFormat and > CustomRecordReader in Hadoop that returns key (intWritable) and value > (BytesWritable) to the map function. The value is nothing but the contents > of a HDFS block(bianry data). The business logic knows how to read this > data. > > Now, I would like to port this code in spark. I am a starter in spark and > could run simple examples (wordcount, pi example) in spark. However, could > not straightforward example to process binaryFiles in spark. I see there > are two solutions for this use case. In the first, avoid using custom input > format and record reader. Find a method (approach) in spark the creates a > RDD for those HDFS blocks, use a map like method that feeds HDFS block > content to the business logic. If this is not possible, I would like to > re-use the custom input format and custom reader using some methods such as > HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first > approach is possible or not. If possible, can anyone please provide some > pointers that contains examples? I was trying second approach but highly > unsuccessful. Here is the code snippet I used > > object Driver { > def myFunc(key : IntWritable, content : BytesWritable) = { > println("my business logic") > // printing key and content value/size is 0 > } > > > def main(args: Array[String]) { > // create a spark context > val conf = new > SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077") > val sc = new SparkContext(conf) > val rd = sc.newAPIHadoopFile("hdfs:///user/name/MyDataFile.dat", > classOf[RandomAccessInputFormat], classOf[IntWritable], > classOf[BytesWritable]) > val count = rd.map (x => func(x._1, x._2)).collect() > } > } > > Can someone tell where I am doing wrong here? I think I am not using API > the right way but failed to find some documentation/usage examples. > > > Thanks in advancea > > - bala >