> Seems the data are not actually written to Cassandra. Before jumping into the Hadoop side of things are you saying there is no data in Cassandra ? Can you retrieve any using the CLI ? Take a look at cfstats on each node to see the estimated record count.
Cheers ----------------- Aaron Morton Freelance Cassandra Developer @aaronmorton http://www.thelastpickle.com On 11 Aug 2011, at 08:20, Jian Fang wrote: > Hi, > > I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file and > then write about 2.5 million records > to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My > Cassandra cluster has three nodes with > one Hadoop task tracker on each node. The wired problem is that I only saw > one map and one reducer tasks and job only took > 53 seconds to finish. Seems the data are not actually written to Cassandra. > > Here is status from Hadoop web admin: > > User: hadoop > Job Name: SalesRankWriter > Job File: > hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml > Job Setup: Successful > Status: Succeeded > Started at: Wed Aug 10 15:24:43 EDT 2011 > Finished at: Wed Aug 10 15:25:36 EDT 2011 > Finished in: 52sec > Job Cleanup: Successful > Kind % Complete Num Tasks Pending Running Complete Killed > Failed/Killed > Task Attempts > map 100.00% > 1 0 0 1 0 0 / 0 > reduce 100.00% > 1 0 0 1 0 0 / 0 > > Counter Map Reduce Total > Job Counters Launched reduce tasks 0 0 1 > Launched map tasks 0 0 1 > Data-local map tasks 0 0 1 > FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 > 101,397,346 > HDFS_BYTES_READ 56,149,360 0 56,149,360 > FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024 > Map-Reduce Framework Reduce input groups 0 2,534,932 > 2,534,932 > Combine output records 0 0 0 > Map input records 2,534,932 0 2,534,932 > Reduce shuffle bytes 0 0 0 > Reduce output records 0 2,534,932 2,534,932 > Spilled Records 5,069,864 2,534,932 7,604,796 > Map output bytes 45,628,776 0 45,628,776 > Map output records 2,534,932 0 2,534,932 > Combine input records 0 0 0 > Reduce input records 0 2,534,932 2,534,932 > > and the log for the mapper > > 2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: > Initializing JVM Metrics with processName=MAP, sessionId= > 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = > 100 > 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer = > 79691776/99614720 > 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record buffer > = 262144/327680 > 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; > bufend = 4718592; bufvoid = 99614720 > 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; > kvend = 262144; length = 327680 > 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 0 > 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 4718592; bufend = 9437166; bufvoid = 99614720 > 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 262144; kvend = 196607; length = 327680 > 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 1 > 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 9437166; bufend = 14155740; bufvoid = 99614720 > 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 196607; kvend = 131070; length = 327680 > 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 2 > 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 14155740; bufend = 18874314; bufvoid = 99614720 > 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 131070; kvend = 65533; length = 327680 > 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 3 > 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 18874314; bufend = 23592906; bufvoid = 99614720 > 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 65533; kvend = 327677; length = 327680 > 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 4 > 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 23592906; bufend = 28311480; bufvoid = 99614720 > 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 327677; kvend = 262140; length = 327680 > 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 5 > 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 28311480; bufend = 33030054; bufvoid = 99614720 > 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 262140; kvend = 196603; length = 327680 > 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 6 > 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 33030054; bufend = 37748628; bufvoid = 99614720 > 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 196603; kvend = 131066; length = 327680 > 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 7 > 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling map > output: record full = true > 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 37748628; bufend = 42467202; bufvoid = 99614720 > 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = > 131066; kvend = 65529; length = 327680 > 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 8 > 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting flush > of map output > 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished spill > 9 > 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 > sorted segments > 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the > last merge-pass, with 10 segments left of total size: 50698660 bytes > 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: > Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process of > commiting > 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task > 'attempt_201108051329_0060_m_000000_0' done. > > and the log for the reducer > > 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: > Initializing JVM Metrics with processName=SHUFFLE, sessionId= > 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: > ShuffleRamManager: MemoryLimit=140699232, MaxSingleShuffleLimit=35174808 > 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging > on-disk files > 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging in > memory files > 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging > on-disk files > 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where 0 is > already in progress > 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling Map > Completion Events > 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts and0 > dup hosts) > 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs > 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 > dup hosts) > 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header: > attempt_201108051329_0060_m_000000_0, compressed len: 50698646, decompressed > len: 50698642 > 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling > 50698642 bytes (50698646 raw bytes) into Local-FS from > attempt_201108051329_0060_m_000000_0 > 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read > 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0 > 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging > on-disk files > 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: > GetMapEventsThread exiting > 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: > getMapsEventsThread joined. > 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram > manager > 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved > on-disk merge complete: 1 files left. > 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: In-memory > merge complete: 0 files left. > 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 > files, 50698646 bytes from disk > 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 > segments, 0 bytes from memory into reduce > 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 > sorted segments > 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the > last merge-pass, with 1 segments left of total size: 50698642 bytes > 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: > Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process of > commiting > 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task > 'attempt_201108051329_0060_r_000000_0' done. > > My code is similar to the word count example: > > public int run(String[] args) throws Exception { > ... > > getConf().set(CONF_COLUMN_NAME, columnName); > > Job job4 = new Job(getConf(), "SalesRankWriter"); > job4.setJarByClass(SalesRankLoader.class); > job4.setMapperClass(RankUpdateMapper.class); > job4.setReducerClass(RankUpdateReducer.class); > job4.setMapOutputKeyClass(Text.class); > job4.setMapOutputValueClass(IntWritable.class); > job4.setOutputKeyClass(ByteBuffer.class); > job4.setOutputValueClass(List.class); > job4.setOutputFormatClass(ColumnFamilyOutputFormat.class); > job4.setInputFormatClass(TextInputFormat.class); > FileInputFormat.addInputPath(job4, new Path(prePath)); > > ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), KEYSPACE, > columnFamily); > ConfigHelper.setRpcPort(job4.getConfiguration(), "9260"); > ConfigHelper.setInitialAddress(job4.getConfiguration(), "dnjsrcha01"); > ConfigHelper.setPartitioner(job4.getConfiguration(), > "org.apache.cassandra.dht.RandomPartitioner"); > > > job4.waitForCompletion(true); > ... > } > > > where the mapper and reducer are defined as: > > public static class RankUpdateMapper extends Mapper<LongWritable, Text, > Text, IntWritable> { > public void map(LongWritable key, Text value, Context context) throws > IOException, InterruptedException { > String line = value.toString(); > StringTokenizer tokenizer = new StringTokenizer(line); > String ean = tokenizer.nextToken(); > int rank = Integer.parseInt(tokenizer.nextToken()); > > context.write(new Text(ean), new IntWritable(rank)); > } > } > > public static class RankUpdateReducer extends Reducer<Text, IntWritable, > ByteBuffer, List<Mutation>> > { > private ByteBuffer outputKey; > > protected void setup(org.apache.hadoop.mapreduce.Reducer.Context > context) throws IOException, InterruptedException > { > outputKey = > ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); > } > > public void reduce(Text key, Iterable<IntWritable> values, Context > context) throws IOException, InterruptedException > { > context.write(outputKey, > Collections.singletonList(getMutation(key, values.iterator().next().get()))); > } > > private static Mutation getMutation(Text key, int value) > { > Column c = new Column(); > c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), > key.getLength()))); > c.setValue(ByteBufferUtil.bytes(String.valueOf(value))); > c.setTimestamp(System.currentTimeMillis() * 1000); > > Mutation m = new Mutation(); > m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); > m.column_or_supercolumn.setColumn(c); > return m; > } > } > > Any thing wrong here? > > Thanks, > > John