I am learning how to upload binary data to HBASE using mapreduce.   Here
are the steps I am following assuming my binary file is testlist
(1) wrote a sequencefilewrite.java to read the local testlist file and save
a sequence file to HDFS
(2) wrote a MapReduce program to read the generated sequence file, and
generate a HFile
(3) bulk import this HFile to HBASE

I am stuck at step (2) as I keep getting exception.  I am absolutely new to
hadoop/hbase,   code is posted below,  any comments or suggestions are
appreciated!!!

Sequencewrite.java is like this:

public class SequenceFileWrite
{
    public static void main(String[] args) throws IOException {
    String uri = args[1];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    Path path = new Path(uri);
    File infile = new File(args[0]);
    SequenceFile.Writer writer = null;
    try
    {

      BytesWritable key, value;
      writer = SequenceFile.createWriter(fs, conf,  path,
BytesWritable.class, BytesWritable.class);
      FileInputStream fin = new FileInputStream(infile);
      for(int i=0; i<10; ++i) {
        key   = new BytesWritable();
        value = new BytesWritable();
        byte[] keybuf = new byte[2];
        byte[] valbuf = new byte[2];
        fin.read(keybuf);
        fin.read(valbuf);
        key.set(keybuf,0,2);
        value.set(valbuf,0,2);
        writer.append(key,value);
      }
    } finally {
           IOUtils.closeStream(writer);
        }
    }
}

And my mapper is like this:

public class HBaseTkrHdrMapper extends Mapper<BytesWritable, BytesWritable,
ImmutableBytesWritable, KeyValue> {

  int tipOffSeconds = 0;
  String tableName = "";

  ImmutableBytesWritable hKey = new ImmutableBytesWritable();
  KeyValue kv;

  @Override
  protected void setup(Context context) throws IOException,
 InterruptedException {
    Configuration c = context.getConfiguration();
    tipOffSeconds   = c.getInt("epoch.seconds.tipoff", 0);
    tableName       = c.get("hbase.table.mrtest");
  }

  @Override
  protected void map(BytesWritable key, BytesWritable value, Context
context)  throws IOException, InterruptedException {
    ImmutableBytesWritable hkey = new
ImmutableBytesWritable(key.getBytes());
    KeyValue               hval = new KeyValue(value.getBytes());
    context.write(hkey, hval);
  }
}

Driver code is as follows:

public class Driver {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    args = new GenericOptionsParser(conf, args).getRemainingArgs();

    @SuppressWarnings("deprecation")
    Job job = new Job(conf, "Bulk Import");
    job.setJarByClass(HBaseTkrHdrMapper.class);

    job.setMapperClass(HBaseTkrHdrMapper.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);

    HTable hTable = new HTable(conf, args[2]);

    // Auto configure partitioner and reducer
    HFileOutputFormat.configureIncrementalLoad(job, hTable);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
  }
}


The exception I got is :


Error: java.lang.IllegalArgumentException: offset (0) + length (4) exceed
the capacity of the array: 3
        at
org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:602)
        at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:751)
        at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:737)
        at org.apache.hadoop.hbase.KeyValue.getLength(KeyValue.java:972)
        at org.apache.hadoop.hbase.KeyValue.<init>(KeyValue.java:276)
        at org.apache.hadoop.hbase.KeyValue.<init>(KeyValue.java:265)
        at
com.bloomberg.tickerplant.hbase.HBaseTkrHdrMapper.map(HBaseTkrHdrMapper.java:41)
        at
com.bloomberg.tickerplant.hbase.HBaseTkrHdrMapper.map(HBaseTkrHdrMapper.java:23)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)


Exception in thread "main" java.io.IOException:
org.apache.hadoop.ipc.RemoteException(java.lang.NullPointerException):
java.lang.NullPointerException
        at
org.apache.hadoop.mapreduce.v2.hs.HistoryClientService$HSClientProtocolHandler.getTaskAttemptCompletionEvents(HistoryClientService.java:269)
        at
org.apache.hadoop.mapreduce.v2.api.impl.pb.service.MRClientProtocolPBServiceImpl.getTaskAttemptCompletionEvents(MRClientProtocolPBServiceImpl.java:173)
        at
org.apache.hadoop.yarn.proto.MRClientProtocol$MRClientProtocolService$2.callBlockingMethod(MRClientProtocol.java:283)
        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)

Reply via email to