I am learning how to upload binary data to HBASE using mapreduce. Here
are the steps I am following assuming my binary file is testlist
(1) wrote a sequencefilewrite.java to read the local testlist file and save
a sequence file to HDFS
(2) wrote a MapReduce program to read the generated sequence file, and
generate a HFile
(3) bulk import this HFile to HBASE
I am stuck at step (2) as I keep getting exception. I am absolutely new to
hadoop/hbase, code is posted below, any comments or suggestions are
appreciated!!!
Sequencewrite.java is like this:
public class SequenceFileWrite
{
public static void main(String[] args) throws IOException {
String uri = args[1];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path(uri);
File infile = new File(args[0]);
SequenceFile.Writer writer = null;
try
{
BytesWritable key, value;
writer = SequenceFile.createWriter(fs, conf, path,
BytesWritable.class, BytesWritable.class);
FileInputStream fin = new FileInputStream(infile);
for(int i=0; i<10; ++i) {
key = new BytesWritable();
value = new BytesWritable();
byte[] keybuf = new byte[2];
byte[] valbuf = new byte[2];
fin.read(keybuf);
fin.read(valbuf);
key.set(keybuf,0,2);
value.set(valbuf,0,2);
writer.append(key,value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
And my mapper is like this:
public class HBaseTkrHdrMapper extends Mapper<BytesWritable, BytesWritable,
ImmutableBytesWritable, KeyValue> {
int tipOffSeconds = 0;
String tableName = "";
ImmutableBytesWritable hKey = new ImmutableBytesWritable();
KeyValue kv;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration c = context.getConfiguration();
tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
tableName = c.get("hbase.table.mrtest");
}
@Override
protected void map(BytesWritable key, BytesWritable value, Context
context) throws IOException, InterruptedException {
ImmutableBytesWritable hkey = new
ImmutableBytesWritable(key.getBytes());
KeyValue hval = new KeyValue(value.getBytes());
context.write(hkey, hval);
}
}
Driver code is as follows:
public class Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "Bulk Import");
job.setJarByClass(HBaseTkrHdrMapper.class);
job.setMapperClass(HBaseTkrHdrMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
HTable hTable = new HTable(conf, args[2]);
// Auto configure partitioner and reducer
HFileOutputFormat.configureIncrementalLoad(job, hTable);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
The exception I got is :
Error: java.lang.IllegalArgumentException: offset (0) + length (4) exceed
the capacity of the array: 3
at
org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:602)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:751)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:737)
at org.apache.hadoop.hbase.KeyValue.getLength(KeyValue.java:972)
at org.apache.hadoop.hbase.KeyValue.<init>(KeyValue.java:276)
at org.apache.hadoop.hbase.KeyValue.<init>(KeyValue.java:265)
at
com.bloomberg.tickerplant.hbase.HBaseTkrHdrMapper.map(HBaseTkrHdrMapper.java:41)
at
com.bloomberg.tickerplant.hbase.HBaseTkrHdrMapper.map(HBaseTkrHdrMapper.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Exception in thread "main" java.io.IOException:
org.apache.hadoop.ipc.RemoteException(java.lang.NullPointerException):
java.lang.NullPointerException
at
org.apache.hadoop.mapreduce.v2.hs.HistoryClientService$HSClientProtocolHandler.getTaskAttemptCompletionEvents(HistoryClientService.java:269)
at
org.apache.hadoop.mapreduce.v2.api.impl.pb.service.MRClientProtocolPBServiceImpl.getTaskAttemptCompletionEvents(MRClientProtocolPBServiceImpl.java:173)
at
org.apache.hadoop.yarn.proto.MRClientProtocol$MRClientProtocolService$2.callBlockingMethod(MRClientProtocol.java:283)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)