Hi Mick,
attached is the very simple MR job, that deletes expired URL from my
test Cassandra DB. The keyspace looks like this:
Keyspace: Test:
Replication Strategy: org.apache.cassandra.locator.SimpleStrategy
Replication Factor: 2
Column Families:
ColumnFamily: Url2
Columns sorted by: org.apache.cassandra.db.marshal.UTF8Type
Row cache size / save period: 0.0/0
Key cache size / save period: 200000.0/3600
Memtable thresholds: 4.7015625/1003/60
GC grace seconds: 864000
Compaction min/max thresholds: 4/32
Read repair chance: 1.0
Built indexes: []
In the CF the key is URL and inside there are some data. My MR job
needs just "expire_date" which is int64 timestamp. For now I store it
as a string because I use Python and C++ to manipulate the data as
well.
For the MR Job to run you need a patch I did. You can find it here:
https://issues.apache.org/jira/browse/CASSANDRA-2014
The atttached file contains the working version with cloned key in
reduce() method. My other aproache was:
[code]
context.write(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()),
Collections.singletonList(getMutation(key)));
[/code]
Which produce junk keys.
Best regards,
Patrik
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
import java.util.*;
import org.apache.cassandra.avro.Mutation;
import org.apache.cassandra.avro.Deletion;
import org.apache.cassandra.avro.SliceRange;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ContextExpirator extends Configured implements Tool
{
static final String KEYSPACE = "Test";
static final String COLUMN_FAMILY = "Url2";
static final String OUTPUT_COLUMN_FAMILY = "Url2";
static final String COLUMN_VALUE = "expire_date";
public static void main(String[] args) throws Exception
{
// Let ToolRunner handle generic command-line options
ToolRunner.run(new Configuration(), new ContextExpirator(), args);
System.exit(0);
}
public static class UrlFilterMapper
extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, NullWritable>
{
private final static NullWritable nic = NullWritable.get();
private ByteBuffer sourceColumn;
private static long now;
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException
{
sourceColumn = ByteBuffer.wrap(COLUMN_VALUE.getBytes());
now = System.currentTimeMillis() / 1000; // convert from ms
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context)
throws IOException, InterruptedException
{
IColumn column = columns.get(sourceColumn);
if (column == null) {
return;
}
Text tKey = new Text(ByteBufferUtil.string(key));
Long value = Long.decode(ByteBufferUtil.string(column.value()));
if(now > value) {
context.write(tKey, nic);
}
}
}
public static class RemoveUrlReducer
extends Reducer<Text, NullWritable, ByteBuffer, List<Mutation>>
{
public void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException
{
ByteBuffer bbKey = ByteBufferUtil.clone(ByteBuffer.wrap(key.getBytes(), 0, key.getLength()));
context.write(bbKey, Collections.singletonList(getMutation()));
}
private static Mutation getMutation()
{
Deletion d = new Deletion();
d.timestamp = System.currentTimeMillis();
Mutation m = new Mutation();
m.deletion = d;
return m;
}
}
public int run(String[] args) throws Exception
{
Job job = new Job(getConf(), "context_expitator");
job.setJarByClass(ContextExpirator.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
job.setMapperClass(UrlFilterMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(RemoveUrlReducer.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
// cassandra details
ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInitialAddress(job.getConfiguration(), "127.0.0.1");
ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
SlicePredicate predicate =
new SlicePredicate().setColumn_names(Arrays.asList(ByteBuffer.wrap(COLUMN_VALUE.getBytes())));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);
return 0;
}
}