Hi, I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file and then write about 2.5 million records to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My Cassandra cluster has three nodes with one Hadoop task tracker on each node. The wired problem is that I only saw one map and one reducer tasks and job only took 53 seconds to finish. Seems the data are not actually written to Cassandra.
Here is status from Hadoop web admin: User: hadoop Job Name: SalesRankWriter Job File: hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml Job Setup: Successful Status: Succeeded Started at: Wed Aug 10 15:24:43 EDT 2011 Finished at: Wed Aug 10 15:25:36 EDT 2011 Finished in: 52sec Job Cleanup: Successful Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed Task Attempts map 100.00% 1 0 0 1 0 0 / 0 reduce 100.00% 1 0 0 1 0 0 / 0 Counter Map Reduce Total Job Counters Launched reduce tasks 0 0 1 Launched map tasks 0 0 1 Data-local map tasks 0 0 1 FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 101,397,346 HDFS_BYTES_READ 56,149,360 0 56,149,360 FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024 Map-Reduce Framework Reduce input groups 0 2,534,932 2,534,932 Combine output records 0 0 0 Map input records 2,534,932 0 2,534,932 Reduce shuffle bytes 0 0 0 Reduce output records 0 2,534,932 2,534,932 Spilled Records 5,069,864 2,534,932 7,604,796 Map output bytes 45,628,776 0 45,628,776 Map output records 2,534,932 0 2,534,932 Combine input records 0 0 0 Reduce input records 0 2,534,932 2,534,932 and the log for the mapper 2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=MAP, sessionId= 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 4718592; bufvoid = 99614720 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; kvend = 262144; length = 327680 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = 4718592; bufend = 9437166; bufvoid = 99614720 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = 262144; kvend = 196607; length = 327680 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished spill 1 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = 9437166; bufend = 14155740; bufvoid = 99614720 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = 196607; kvend = 131070; length = 327680 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished spill 2 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = 14155740; bufend = 18874314; bufvoid = 99614720 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = 131070; kvend = 65533; length = 327680 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished spill 3 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = 18874314; bufend = 23592906; bufvoid = 99614720 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = 65533; kvend = 327677; length = 327680 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished spill 4 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = 23592906; bufend = 28311480; bufvoid = 99614720 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = 327677; kvend = 262140; length = 327680 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished spill 5 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = 28311480; bufend = 33030054; bufvoid = 99614720 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = 262140; kvend = 196603; length = 327680 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished spill 6 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = 33030054; bufend = 37748628; bufvoid = 99614720 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = 196603; kvend = 131066; length = 327680 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished spill 7 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = 37748628; bufend = 42467202; bufvoid = 99614720 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = 131066; kvend = 65529; length = 327680 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished spill 8 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished spill 9 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 sorted segments 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 10 segments left of total size: 50698660 bytes 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process of commiting 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task 'attempt_201108051329_0060_m_000000_0' done. and the log for the reducer 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: Initializing JVM Metrics with processName=SHUFFLE, sessionId= 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: MemoryLimit=140699232, MaxSingleShuffleLimit=35174808 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging on-disk files 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging in memory files 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging on-disk files 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where 0 is already in progress 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling Map Completion Events 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts and0 dup hosts) 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts) 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header: attempt_201108051329_0060_m_000000_0, compressed len: 50698646, decompressed len: 50698642 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 50698642 bytes (50698646 raw bytes) into Local-FS from attempt_201108051329_0060_m_000000_0 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging on-disk files 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined. 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 1 files left. 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 0 files left. 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 files, 50698646 bytes from disk 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50698642 bytes 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process of commiting 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task 'attempt_201108051329_0060_r_000000_0' done. My code is similar to the word count example: public int run(String[] args) throws Exception { ... getConf().set(CONF_COLUMN_NAME, columnName); Job job4 = new Job(getConf(), "SalesRankWriter"); job4.setJarByClass(SalesRankLoader.class); job4.setMapperClass(RankUpdateMapper.class); job4.setReducerClass(RankUpdateReducer.class); job4.setMapOutputKeyClass(Text.class); job4.setMapOutputValueClass(IntWritable.class); job4.setOutputKeyClass(ByteBuffer.class); job4.setOutputValueClass(List.class); job4.setOutputFormatClass(ColumnFamilyOutputFormat.class); job4.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job4, new Path(prePath)); ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), KEYSPACE, columnFamily); ConfigHelper.setRpcPort(job4.getConfiguration(), "9260"); ConfigHelper.setInitialAddress(job4.getConfiguration(), "dnjsrcha01"); ConfigHelper.setPartitioner(job4.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); job4.waitForCompletion(true); ... } where the mapper and reducer are defined as: public static class RankUpdateMapper extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); String ean = tokenizer.nextToken(); int rank = Integer.parseInt(tokenizer.nextToken()); context.write(new Text(ean), new IntWritable(rank)); } } public static class RankUpdateReducer extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> { private ByteBuffer outputKey; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); } public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(outputKey, Collections.singletonList(getMutation(key, values.iterator().next().get()))); } private static Mutation getMutation(Text key, int value) { Column c = new Column(); c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), key.getLength()))); c.setValue(ByteBufferUtil.bytes(String.valueOf(value))); c.setTimestamp(System.currentTimeMillis() * 1000); Mutation m = new Mutation(); m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); m.column_or_supercolumn.setColumn(c); return m; } } Any thing wrong here? Thanks, John