Here's a piece of code. In your case, you are missing the call() method inside the map function.
import java.util.Iterator; import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.NewHadoopRDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import com.google.common.collect.Lists; import scala.Function1; import scala.Tuple2; import scala.collection.JavaConversions; import scala.collection.Seq; import scala.collection.JavaConverters.*; import scala.reflect.ClassTag; public class SparkHBaseMain { @SuppressWarnings("deprecation") public static void main(String[] arg){ try{ List<String> jars = > Lists.newArrayList("/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar", "/home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar", "/home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar", "/home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar", "/home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar", "/home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar", "/home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar"); SparkConf spconf = new SparkConf(); spconf.setMaster("local"); spconf.setAppName("SparkHBase"); spconf.setSparkHome("/home/akhld/Desktop/tools/spark-9"); spconf.setJars(jars.toArray(new String[jars.size()])); spconf.set("spark.executor.memory", "1g"); final JavaSparkContext sc = new JavaSparkContext(spconf); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.addResource("/home/akhld/Downloads/sparkhbasecode/hbase-site.xml"); conf.set(TableInputFormat.INPUT_TABLE, "blogposts"); NewHadoopRDD<ImmutableBytesWritable, Result> rdd = new > NewHadoopRDD<ImmutableBytesWritable, > Result>(JavaSparkContext.toSparkContext(sc), TableInputFormat.class, > ImmutableBytesWritable.class, Result.class, conf); JavaRDD<Tuple2<ImmutableBytesWritable, Result>> jrdd = rdd.toJavaRDD(); *ForEachFunction f = new ForEachFunction();* * JavaRDD<Iterator<String>> retrdd = jrdd.map(f);* > System.out.println("Count =>" + retrdd.count()); }catch(Exception e){ e.printStackTrace(); System.out.println("Craaaashed : " + e); } } @SuppressWarnings("serial") private static class ForEachFunction extends > Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{ *public Iterator<String> call(Tuple2<ImmutableBytesWritable, > Result> test) {* * Result tmp = (Result) test._2;* * List<KeyValue> kvl = tmp.getColumn("post".getBytes(), > "title".getBytes());* * for(KeyValue kl:kvl){* * String sb = new String(kl.getValue());* * System.out.println("Value :" + sb);* * }* * return null;* * }* } > } Hope it helps. Thanks Best Regards On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi Akhil, > > Thank you for your response. I'm facing below issues. > > I'm not able to print the values. Am I missing any thing. Could you please > look into this issue. > > JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = > sc.newAPIHadoopRDD( > conf, > TableInputFormat.class, > ImmutableBytesWritable.class, > Result.class); > > System.out.println(" ROWS COUNT = "+ hBaseRDD.count()); > > JavaRDD R = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, > Result>, Iterator<String>>(){ > > public Iterator<String> call(Tuple2<ImmutableBytesWritable, > Result> test) > { > Result tmp = (Result) test._2; > > System.out.println("Inside "); > > // List<KeyValue> kvl = tmp.getColumn("post".getBytes(), > "title".getBytes()); > for(KeyValue kl:tmp.raw()) > > { > String sb = new String(kl.getValue()); > System.out.println(sb); > } > return null; > } > } > ); > > *Output :* > > ROWS COUNT = 8 > > It is not printing "Inside" statement also. I think it is not going into > this function. > > Could you please help me on this issue. > > Thank you for your support and help > > Regards, > Rajesh > > > > On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> You can use a map function like the following and do whatever you want >> with the Result. >> >> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{ >>> public Iterator<String> call(Tuple2<ImmutableBytesWritable, >>> Result> test) { >>> Result tmp = (Result) test._2; >>> List<KeyValue> kvl = *tmp.getColumn("post".getBytes(), >>> "title".getBytes());* >>> for(KeyValue kl:kvl){ >>> String sb = new String(kl.getValue()); >>> System.out.println(sb); >>> } >> >> >> >> >> Thanks >> Best Regards >> >> >> On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar < >> mrajaf...@gmail.com> wrote: >> >>> Hi Team, >>> >>> I'm using below code to read table from hbase >>> >>> Configuration conf = HBaseConfiguration.create(); >>> conf.set(TableInputFormat.INPUT_TABLE, "table1"); >>> >>> JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD( >>> conf, >>> TableInputFormat.class, >>> ImmutableBytesWritable.class, >>> Result.class); >>> >>> I got hBaseRDD. I'm not able to read the column values from hBaseRDD. >>> >>> *Could you please let me know, how to read the column values from >>> hBaseRDD?* >>> >>> Thank you for your help. >>> >>> Regards, >>> Rajesh >>> >>> >> >