Greetings,
I'm trying to backport CopyTable to HBase 0.20.6.
In other words, the challenge is to write a job that would copy data from
one HTable on cluster A to another HTable on cluster B.
I'm able to copy HTable to another HTable on the same cluster, but I can not
find a way to point to the second cluster on the reduce phase.
Here is the code :
package org.myorg.sandbox;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CopyHTable extends Configured implements Tool {
static class CopyMapper
extends TableMapper<ImmutableBytesWritable, Put> {
@Override
public void map(ImmutableBytesWritable row, Result result, Context
context)
throws IOException {
byte[] family = Bytes.toBytes("data");
byte[] qual01 = Bytes.toBytes("year");
byte[] qual02 = Bytes.toBytes("temperature");
byte[] key = result.getRow();
byte[] val01 = result.getValue(family, qual01);
byte[] val02 = result.getValue(family, qual02);
Put put = new Put(key);
put.add(family, qual01, val01);
put.add(family, qual02, val02);
try {
context.write(new ImmutableBytesWritable(row), put);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new HBaseConfiguration();
String tableName = "temp";
Job job = new Job(conf);
job.setJarByClass(CopyHTable.class);
Scan s= new Scan();
s.addFamily(Bytes.toBytes("data"));
TableMapReduceUtil.initTableMapperJob(tableName, s,
CopyMapper.class, Text.class,
IntWritable.class, job);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "temp");
/* no luck with:
job.getConfiguration().set("hbase.zookeeper.quorum", "nodeb");
*/
job.setOutputFormatClass(TableOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
if (job.waitForCompletion(true)) return 0;
else return 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new CopyHTable(), args);
System.exit(exitCode);
}
}