There are data and each Cassandra cluster node holds about 100G. From the application point of view, if I run the job twice with the same input file, i.e., the sales rank update file, then I should see a much smaller number of products, whose rank change exceeds the threshold, in the output file for the second run because the sales ranks have been updated to be the same as the ranks in the input file during the first run. But actually, I saw the output files stay the same for the two runs. One explanation is that the ranks were not actually updated for the first run. Also, 53 seconds to run the whole hadoop job with 2.5 million Cassandra updates on three nodes, do you think that is possible? Each node is a regular Linux box with 8 CPUs.
Thanks, John On Wed, Aug 10, 2011 at 5:40 PM, aaron morton <aa...@thelastpickle.com>wrote: > Seems the data are not actually written to Cassandra. > > > Before jumping into the Hadoop side of things are you saying there is no > data in Cassandra ? Can you retrieve any using the CLI ? Take a look at > cfstats on each node to see the estimated record count. > > Cheers > > ----------------- > Aaron Morton > Freelance Cassandra Developer > @aaronmorton > http://www.thelastpickle.com > > On 11 Aug 2011, at 08:20, Jian Fang wrote: > > Hi, > > I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file > and then write about 2.5 million records > to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My > Cassandra cluster has three nodes with > one Hadoop task tracker on each node. The wired problem is that I only saw > one map and one reducer tasks and job only took > 53 seconds to finish. Seems the data are not actually written to Cassandra. > > Here is status from Hadoop web admin: > > User: hadoop > Job Name: SalesRankWriter > Job File: > hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml > Job Setup: Successful > Status: Succeeded > Started at: Wed Aug 10 15:24:43 EDT 2011 > Finished at: Wed Aug 10 15:25:36 EDT 2011 > Finished in: 52sec > Job Cleanup: Successful > Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed > Task Attempts > map 100.00% > 1 0 0 1 0 0 / 0 > reduce 100.00% > 1 0 0 1 0 0 / 0 > > Counter Map Reduce Total > Job Counters Launched reduce tasks 0 0 1 > Launched map tasks 0 0 1 > Data-local map tasks 0 0 1 > FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 101,397,346 > HDFS_BYTES_READ 56,149,360 0 56,149,360 > FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024 > Map-Reduce Framework Reduce input groups 0 2,534,932 2,534,932 > Combine output records 0 0 0 > Map input records 2,534,932 0 2,534,932 > Reduce shuffle bytes 0 0 0 > Reduce output records 0 2,534,932 2,534,932 > Spilled Records 5,069,864 2,534,932 7,604,796 > Map output bytes 45,628,776 0 45,628,776 > Map output records 2,534,932 0 2,534,932 > Combine input records 0 0 0 > Reduce input records 0 2,534,932 2,534,932 > > and the log for the mapper > > 2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: > Initializing JVM Metrics with processName=MAP, sessionId= > 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = > 100 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data > buffer = 79691776/99614720 2011-08-10 15:24:48,917 INFO > org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680 2011-08-10 > 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: > record full = true 2011-08-10 15:24:49,760 INFO > org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 4718592; bufvoid = > 99614720 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: > kvstart = 0; kvend = 262144; length = 327680 2011-08-10 15:24:50,364 INFO > org.apache.hadoop.mapred.MapTask: Finished spill 0 2011-08-10 15:24:50,707 > INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = > true 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart > = 4718592; bufend = 9437166; bufvoid = 99614720 2011-08-10 15:24:50,707 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 262144; kvend = 196607; length = > 327680 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 1 2011-08-10 15:24:51,583 INFO > org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true > 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 9437166; bufend = 14155740; bufvoid = 99614720 2011-08-10 15:24:51,583 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 196607; kvend = 131070; length = > 327680 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 2 2011-08-10 15:24:52,433 INFO > org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true > 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 14155740; bufend = 18874314; bufvoid = 99614720 2011-08-10 15:24:52,433 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 131070; kvend = 65533; length = > 327680 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 3 2011-08-10 15:24:53,216 INFO > org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true > 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 18874314; bufend = 23592906; bufvoid = 99614720 2011-08-10 15:24:53,216 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 65533; kvend = 327677; length = > 327680 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 4 2011-08-10 15:24:54,010 INFO > org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true > 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 23592906; bufend = 28311480; bufvoid = 99614720 2011-08-10 15:24:54,010 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 327677; kvend = 262140; length = > 327680 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 5 2011-08-10 15:24:54,793 INFO > org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true > 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 28311480; bufend = 33030054; bufvoid = 99614720 2011-08-10 15:24:54,793 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 262140; kvend = 196603; length = > 327680 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 6 2011-08-10 15:24:55,564 INFO > org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true > 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 33030054; bufend = 37748628; bufvoid = 99614720 2011-08-10 15:24:55,564 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 196603; kvend = 131066; length = > 327680 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 7 2011-08-10 15:24:56,434 INFO > org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true > 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = > 37748628; bufend = 42467202; bufvoid = 99614720 2011-08-10 15:24:56,434 INFO > org.apache.hadoop.mapred.MapTask: kvstart = 131066; kvend = 65529; length = > 327680 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: > Finished spill 8 2011-08-10 15:24:57,051 INFO > org.apache.hadoop.mapred.MapTask: Starting flush of map output 2011-08-10 > 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished spill 9 > 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 > sorted segments 2011-08-10 15:24:57,297 INFO > org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 10 > segments left of total size: 50698660 bytes 2011-08-10 15:24:59,552 INFO > org.apache.hadoop.mapred.TaskRunner: > Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process of > commiting 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: > Task 'attempt_201108051329_0060_m_000000_0' done. > > and the log for the reducer > > 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: > Initializing JVM Metrics with processName=SHUFFLE, sessionId= 2011-08-10 > 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: > MemoryLimit=140699232, MaxSingleShuffleLimit=35174808 2011-08-10 > 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging > on-disk files 2011-08-10 15:25:01,022 INFO > org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 > Thread started: Thread for merging in memory files 2011-08-10 15:25:01,022 > INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging > on-disk files 2011-08-10 15:25:01,024 INFO > org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 > Need another 1 map output(s) where 0 is already in progress 2011-08-10 > 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling Map > Completion Events 2011-08-10 15:25:01,037 INFO > org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0 > Scheduled 0 outputs (0 slow hosts and0 dup hosts) 2011-08-10 15:25:01,038 > INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs 2011-08-10 > 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 > dup hosts) 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: > header: attempt_201108051329_0060_m_000000_0, compressed len: 50698646, > decompressed len: 50698642 2011-08-10 15:25:06,121 INFO > org.apache.hadoop.mapred.ReduceTask: Shuffling 50698642 bytes (50698646 raw > bytes) into Local-FS from attempt_201108051329_0060_m_000000_0 2011-08-10 > 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 50698646 bytes > from map-output for attempt_201108051329_0060_m_000000_0 2011-08-10 > 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: > attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging > on-disk files 2011-08-10 15:25:07,055 INFO > org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting 2011-08-10 > 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread > joined. 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: > Closed ram manager 2011-08-10 15:25:07,056 INFO > org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 1 > files left. 2011-08-10 15:25:07,056 INFO > org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 0 files left. > 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 > files, 50698646 bytes from disk 2011-08-10 15:25:07,062 INFO > org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory > into reduce 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: > Merging 1 sorted segments 2011-08-10 15:25:07,072 INFO > org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 > segments left of total size: 50698642 bytes 2011-08-10 15:25:30,126 INFO > org.apache.hadoop.mapred.TaskRunner: > Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process of > commiting 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: > Task 'attempt_201108051329_0060_r_000000_0' done. > > My code is similar to the word count example: > > public int run(String[] args) throws Exception { > ... > getConf().set(CONF_COLUMN_NAME, columnName); > > Job job4 = new Job(getConf(), "SalesRankWriter"); > job4.setJarByClass(SalesRankLoader.class); > job4.setMapperClass(RankUpdateMapper.class); > job4.setReducerClass(RankUpdateReducer.class); > job4.setMapOutputKeyClass(Text.class); > job4.setMapOutputValueClass(IntWritable.class); > job4.setOutputKeyClass(ByteBuffer.class); > job4.setOutputValueClass(List.class); > job4.setOutputFormatClass(ColumnFamilyOutputFormat.class); > job4.setInputFormatClass(TextInputFormat.class); > FileInputFormat.addInputPath(job4, new Path(prePath)); > > ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), > KEYSPACE, columnFamily); > ConfigHelper.setRpcPort(job4.getConfiguration(), "9260"); > ConfigHelper.setInitialAddress(job4.getConfiguration(), > "dnjsrcha01"); > ConfigHelper.setPartitioner(job4.getConfiguration(), > "org.apache.cassandra.dht.RandomPartitioner"); > > job4.waitForCompletion(true); > ... > } > > where the mapper and reducer are defined as: > > public static class RankUpdateMapper extends Mapper<LongWritable, Text, > Text, IntWritable> { public void map(LongWritable key, Text value, Context > context) throws IOException, InterruptedException { String line = > value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); > String ean = tokenizer.nextToken(); int rank = > Integer.parseInt(tokenizer.nextToken()); context.write(new Text(ean), new > IntWritable(rank)); } } public static class RankUpdateReducer extends > Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> { private ByteBuffer > outputKey; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context > context) throws IOException, InterruptedException { outputKey = > ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); } > public void reduce(Text key, Iterable<IntWritable> values, Context context) > throws IOException, InterruptedException { context.write(outputKey, > Collections.singletonList(getMutation(key, > values.iterator().next().get()))); } private static Mutation > getMutation(Text key, int value) { Column c = new Column(); > c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), key.getLength()))); > c.setValue(ByteBufferUtil.bytes(String.valueOf(value))); > c.setTimestamp(System.currentTimeMillis() * 1000); Mutation m = new > Mutation(); m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); > m.column_or_supercolumn.setColumn(c); return m; } } > > Any thing wrong here? > > Thanks, > > John > > >