Greetings,

I'm trying to backport CopyTable to HBase 0.20.6.
In other words, the challenge is to write a job that would copy data from
one HTable on cluster A to another HTable on cluster B.

I'm able to copy HTable to another HTable on the same cluster, but I can not
find a way to point to the second cluster on the reduce phase.

Here is the code :

package org.myorg.sandbox;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CopyHTable extends Configured implements Tool {
    static class CopyMapper
    extends TableMapper<ImmutableBytesWritable, Put> {
        @Override
        public void map(ImmutableBytesWritable row, Result result, Context
context)
        throws IOException {

            byte[] family = Bytes.toBytes("data");
            byte[] qual01 = Bytes.toBytes("year");
            byte[] qual02 = Bytes.toBytes("temperature");
            byte[] key = result.getRow();
            byte[] val01 = result.getValue(family, qual01);
            byte[] val02 = result.getValue(family, qual02);

            Put put = new Put(key);
            put.add(family, qual01, val01);
            put.add(family, qual02, val02);

            try {
                context.write(new ImmutableBytesWritable(row), put);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new HBaseConfiguration();
        String tableName = "temp";

        Job job = new Job(conf);
        job.setJarByClass(CopyHTable.class);

        Scan s= new Scan();
        s.addFamily(Bytes.toBytes("data"));

        TableMapReduceUtil.initTableMapperJob(tableName, s,
                CopyMapper.class, Text.class,
                IntWritable.class, job);

        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "temp");
        /* no luck with:
        job.getConfiguration().set("hbase.zookeeper.quorum", "nodeb");
        */
        job.setOutputFormatClass(TableOutputFormat.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Writable.class);

        job.setNumReduceTasks(0);

        if (job.waitForCompletion(true)) return 0;
        else return 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new CopyHTable(), args);
        System.exit(exitCode);
    }
}

Reply via email to