Hi,

I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file
and then write about 2.5 million records
to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My
Cassandra cluster has three nodes with
one Hadoop task tracker on each node. The wired problem is that I only saw
one map and one reducer tasks and job only took
53 seconds to finish. Seems the data are not actually written to Cassandra.

Here is status from Hadoop web admin:

User: hadoop
Job Name: SalesRankWriter
Job File:
hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml
Job Setup: Successful
Status: Succeeded
Started at: Wed Aug 10 15:24:43 EDT 2011
Finished at: Wed Aug 10 15:25:36 EDT 2011
Finished in: 52sec
Job Cleanup: Successful
Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed
Task Attempts
map 100.00%
1 0 0 1 0 0 / 0
reduce 100.00%
1 0 0 1 0 0 / 0

Counter Map Reduce Total
Job Counters Launched reduce tasks 0 0 1
Launched map tasks 0 0 1
Data-local map tasks 0 0 1
FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 101,397,346
HDFS_BYTES_READ 56,149,360 0 56,149,360
FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024
Map-Reduce Framework Reduce input groups 0 2,534,932 2,534,932
Combine output records 0 0 0
Map input records 2,534,932 0 2,534,932
Reduce shuffle bytes 0 0 0
Reduce output records 0 2,534,932 2,534,932
Spilled Records 5,069,864 2,534,932 7,604,796
Map output bytes 45,628,776 0 45,628,776
Map output records 2,534,932 0 2,534,932
Combine input records 0 0 0
Reduce input records 0 2,534,932 2,534,932

and the log for the mapper

 2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics:
Initializing JVM Metrics with processName=MAP, sessionId=
2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb =
100 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data
buffer = 79691776/99614720 2011-08-10 15:24:48,917 INFO
org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680 2011-08-10
15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map output:
record full = true 2011-08-10 15:24:49,760 INFO
org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 4718592; bufvoid =
99614720 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask:
kvstart = 0; kvend = 262144; length = 327680 2011-08-10 15:24:50,364 INFO
org.apache.hadoop.mapred.MapTask: Finished spill 0 2011-08-10 15:24:50,707
INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full =
true 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart
= 4718592; bufend = 9437166; bufvoid = 99614720 2011-08-10 15:24:50,707 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 262144; kvend = 196607; length =
327680 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 1 2011-08-10 15:24:51,583 INFO
org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart =
9437166; bufend = 14155740; bufvoid = 99614720 2011-08-10 15:24:51,583 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 196607; kvend = 131070; length =
327680 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 2 2011-08-10 15:24:52,433 INFO
org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart =
14155740; bufend = 18874314; bufvoid = 99614720 2011-08-10 15:24:52,433 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 131070; kvend = 65533; length =
327680 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 3 2011-08-10 15:24:53,216 INFO
org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart =
18874314; bufend = 23592906; bufvoid = 99614720 2011-08-10 15:24:53,216 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 65533; kvend = 327677; length =
327680 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 4 2011-08-10 15:24:54,010 INFO
org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart =
23592906; bufend = 28311480; bufvoid = 99614720 2011-08-10 15:24:54,010 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 327677; kvend = 262140; length =
327680 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 5 2011-08-10 15:24:54,793 INFO
org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart =
28311480; bufend = 33030054; bufvoid = 99614720 2011-08-10 15:24:54,793 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 262140; kvend = 196603; length =
327680 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 6 2011-08-10 15:24:55,564 INFO
org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart =
33030054; bufend = 37748628; bufvoid = 99614720 2011-08-10 15:24:55,564 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 196603; kvend = 131066; length =
327680 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 7 2011-08-10 15:24:56,434 INFO
org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart =
37748628; bufend = 42467202; bufvoid = 99614720 2011-08-10 15:24:56,434 INFO
org.apache.hadoop.mapred.MapTask: kvstart = 131066; kvend = 65529; length =
327680 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask:
Finished spill 8 2011-08-10 15:24:57,051 INFO
org.apache.hadoop.mapred.MapTask: Starting flush of map output 2011-08-10
15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished spill 9
2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10
sorted segments 2011-08-10 15:24:57,297 INFO
org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 10
segments left of total size: 50698660 bytes 2011-08-10 15:24:59,552 INFO
org.apache.hadoop.mapred.TaskRunner:
Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process of
commiting 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner:
Task 'attempt_201108051329_0060_m_000000_0' done.

and the log for the reducer

2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics:
Initializing JVM Metrics with processName=SHUFFLE, sessionId= 2011-08-10
15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager:
MemoryLimit=140699232, MaxSingleShuffleLimit=35174808 2011-08-10
15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging
on-disk files 2011-08-10 15:25:01,022 INFO
org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Thread started: Thread for merging in memory files 2011-08-10 15:25:01,022
INFO org.apache.hadoop.mapred.ReduceTask:
attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging
on-disk files 2011-08-10 15:25:01,024 INFO
org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Need another 1 map output(s) where 0 is already in progress 2011-08-10
15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling Map
Completion Events 2011-08-10 15:25:01,037 INFO
org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
Scheduled 0 outputs (0 slow hosts and0 dup hosts) 2011-08-10 15:25:01,038
INFO org.apache.hadoop.mapred.ReduceTask:
attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs 2011-08-10
15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts and0
dup hosts) 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask:
header: attempt_201108051329_0060_m_000000_0, compressed len: 50698646,
decompressed len: 50698642 2011-08-10 15:25:06,121 INFO
org.apache.hadoop.mapred.ReduceTask: Shuffling 50698642 bytes (50698646 raw
bytes) into Local-FS from attempt_201108051329_0060_m_000000_0 2011-08-10
15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 50698646 bytes
from map-output for attempt_201108051329_0060_m_000000_0 2011-08-10
15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask:
attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging
on-disk files 2011-08-10 15:25:07,055 INFO
org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting 2011-08-10
15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread
joined. 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask:
Closed ram manager 2011-08-10 15:25:07,056 INFO
org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 1
files left. 2011-08-10 15:25:07,056 INFO
org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 0 files left.
2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1
files, 50698646 bytes from disk 2011-08-10 15:25:07,062 INFO
org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory
into reduce 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger:
Merging 1 sorted segments 2011-08-10 15:25:07,072 INFO
org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1
segments left of total size: 50698642 bytes 2011-08-10 15:25:30,126 INFO
org.apache.hadoop.mapred.TaskRunner:
Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process of
commiting 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner:
Task 'attempt_201108051329_0060_r_000000_0' done.

My code is similar to the word count example:

    public int run(String[] args) throws Exception {
...
        getConf().set(CONF_COLUMN_NAME, columnName);

        Job job4 = new Job(getConf(), "SalesRankWriter");
        job4.setJarByClass(SalesRankLoader.class);
        job4.setMapperClass(RankUpdateMapper.class);
        job4.setReducerClass(RankUpdateReducer.class);
        job4.setMapOutputKeyClass(Text.class);
        job4.setMapOutputValueClass(IntWritable.class);
        job4.setOutputKeyClass(ByteBuffer.class);
        job4.setOutputValueClass(List.class);
        job4.setOutputFormatClass(ColumnFamilyOutputFormat.class);
        job4.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job4, new Path(prePath));

        ConfigHelper.setOutputColumnFamily(job4.getConfiguration(),
KEYSPACE, columnFamily);
        ConfigHelper.setRpcPort(job4.getConfiguration(), "9260");
        ConfigHelper.setInitialAddress(job4.getConfiguration(),
"dnjsrcha01");
        ConfigHelper.setPartitioner(job4.getConfiguration(),
"org.apache.cassandra.dht.RandomPartitioner");

        job4.waitForCompletion(true);
...
}

where the mapper and reducer are defined as:

public static class RankUpdateMapper extends Mapper<LongWritable, Text,
Text, IntWritable> { public void map(LongWritable key, Text value, Context
context) throws IOException, InterruptedException { String line =
value.toString(); StringTokenizer tokenizer = new StringTokenizer(line);
String ean = tokenizer.nextToken(); int rank =
Integer.parseInt(tokenizer.nextToken()); context.write(new Text(ean), new
IntWritable(rank)); } } public static class RankUpdateReducer extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> { private ByteBuffer
outputKey; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context
context) throws IOException, InterruptedException { outputKey =
ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); }
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException { context.write(outputKey,
Collections.singletonList(getMutation(key,
values.iterator().next().get()))); } private static Mutation
getMutation(Text key, int value) { Column c = new Column();
c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), key.getLength())));
c.setValue(ByteBufferUtil.bytes(String.valueOf(value)));
c.setTimestamp(System.currentTimeMillis() * 1000); Mutation m = new
Mutation(); m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
m.column_or_supercolumn.setColumn(c); return m; } }

Any thing wrong here?

Thanks,

John

Reply via email to