I suggested turning up the logging to see if the server processed a batch_mutate call. This is done from the CassandraServer class (https://github.com/apache/cassandra/blob/cassandra-0.8.4/src/java/org/apache/cassandra/thrift/CassandraServer.java#L531) , not the CFOF.
The first step will be to determine if a call is been made to the server. If not then work out why not. If so then were is the data going. Cheers ----------------- Aaron Morton Freelance Cassandra Developer @aaronmorton http://www.thelastpickle.com On 17/08/2011, at 1:36 AM, Jian Fang wrote: > If you look at the source code and you will find there is no log message in > the ColumnFamilyOutputFormat class and the related classes. > How to trace the problem then? No one actually got this working? > > On Thu, Aug 11, 2011 at 6:10 PM, aaron morton <aa...@thelastpickle.com> wrote: > Turn the logging up in cassandra or your MR job and make sure the > batch_mutation is sent. Sounds like it's not. > > Cheers > > ----------------- > Aaron Morton > Freelance Cassandra Developer > @aaronmorton > http://www.thelastpickle.com > > On 12 Aug 2011, at 07:22, Jian Fang wrote: > >> 53 seconds included the map phase to read and process the input file. The >> records were updated at the end of the reduce phase. >> >> I checked the sales ranks in the update file and the sales ranks in the >> Cassandra, they are different and thus, the records >> were not actually updated. >> >> I remember I run the word count example for Cassandra 0.8.0 some time ago, I >> saw the similar behavior, i.e., the results were not written to the >> Cassandra column family. But If I changed hadoop to write to the file >> system, I can see the results in the output file. I didn't try the word >> count example for Cassandra 0.8.2 though. >> >> Anyway to solve this problem? >> >> Thanks, >> >> John >> On Thu, Aug 11, 2011 at 12:17 AM, aaron morton <aa...@thelastpickle.com> >> wrote: >> I'm a simple guy. My first step would be see if the expected data is in the >> data base, if not what's missing. >> >> 2.5M updates / 3 nodes = 833,333 per node >> 833,333 / 53 seconds = 15,723 per second >> 1 / 15,723 = 0.00006 seconds / 0.06 milliseconds per mutation >> >> sounds reasonable to me. >> >> check the Write Latency in nodetool cfstats and the row count estimates. >> >> Cheers >> >> ----------------- >> Aaron Morton >> Freelance Cassandra Developer >> @aaronmorton >> http://www.thelastpickle.com >> >> On 11 Aug 2011, at 14:50, Jian Fang wrote: >> >>> There are data and each Cassandra cluster node holds about 100G. From the >>> application point of view, if I run the job twice with the same input file, >>> i.e., the sales rank update file, then I should see a much smaller number >>> of products, whose rank change exceeds the threshold, in the output file >>> for the second run because the sales ranks have been updated to be the same >>> as the ranks in the input file during the first run. But actually, I saw >>> the output files stay the same for the two runs. One explanation is that >>> the ranks were not actually updated for the first run. Also, 53 seconds to >>> run the whole hadoop job with 2.5 million Cassandra updates on three nodes, >>> do you think that is possible? Each node is a regular Linux box with 8 >>> CPUs. >>> >>> Thanks, >>> >>> John >>> >>> On Wed, Aug 10, 2011 at 5:40 PM, aaron morton <aa...@thelastpickle.com> >>> wrote: >>>> Seems the data are not actually written to Cassandra. >>> >>> Before jumping into the Hadoop side of things are you saying there is no >>> data in Cassandra ? Can you retrieve any using the CLI ? Take a look at >>> cfstats on each node to see the estimated record count. >>> >>> Cheers >>> >>> ----------------- >>> Aaron Morton >>> Freelance Cassandra Developer >>> @aaronmorton >>> http://www.thelastpickle.com >>> >>> On 11 Aug 2011, at 08:20, Jian Fang wrote: >>> >>>> Hi, >>>> >>>> I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file >>>> and then write about 2.5 million records >>>> to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My >>>> Cassandra cluster has three nodes with >>>> one Hadoop task tracker on each node. The wired problem is that I only saw >>>> one map and one reducer tasks and job only took >>>> 53 seconds to finish. Seems the data are not actually written to Cassandra. >>>> >>>> Here is status from Hadoop web admin: >>>> >>>> User: hadoop >>>> Job Name: SalesRankWriter >>>> Job File: >>>> hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml >>>> Job Setup: Successful >>>> Status: Succeeded >>>> Started at: Wed Aug 10 15:24:43 EDT 2011 >>>> Finished at: Wed Aug 10 15:25:36 EDT 2011 >>>> Finished in: 52sec >>>> Job Cleanup: Successful >>>> Kind % Complete Num Tasks Pending Running Complete >>>> Killed Failed/Killed >>>> Task Attempts >>>> map 100.00% >>>> 1 0 0 1 0 0 / 0 >>>> reduce 100.00% >>>> 1 0 0 1 0 0 / 0 >>>> >>>> Counter Map Reduce Total >>>> Job Counters Launched reduce tasks 0 0 1 >>>> Launched map tasks 0 0 1 >>>> Data-local map tasks 0 0 1 >>>> FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 >>>> 101,397,346 >>>> HDFS_BYTES_READ 56,149,360 0 56,149,360 >>>> FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024 >>>> Map-Reduce Framework Reduce input groups 0 2,534,932 >>>> 2,534,932 >>>> Combine output records 0 0 0 >>>> Map input records 2,534,932 0 2,534,932 >>>> Reduce shuffle bytes 0 0 0 >>>> Reduce output records 0 2,534,932 2,534,932 >>>> Spilled Records 5,069,864 2,534,932 7,604,796 >>>> Map output bytes 45,628,776 0 45,628,776 >>>> Map output records 2,534,932 0 2,534,932 >>>> Combine input records 0 0 0 >>>> Reduce input records 0 2,534,932 2,534,932 >>>> >>>> and the log for the mapper >>>> >>>> 2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: >>>> Initializing JVM Metrics with processName=MAP, sessionId= >>>> 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb >>>> = 100 >>>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer >>>> = 79691776/99614720 >>>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record >>>> buffer = 262144/327680 >>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 0; bufend = 4718592; bufvoid = 99614720 >>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 0; kvend = 262144; length = 327680 >>>> 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 0 >>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 4718592; bufend = 9437166; bufvoid = 99614720 >>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 262144; kvend = 196607; length = 327680 >>>> 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 1 >>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 9437166; bufend = 14155740; bufvoid = 99614720 >>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 196607; kvend = 131070; length = 327680 >>>> 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 2 >>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 14155740; bufend = 18874314; bufvoid = 99614720 >>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 131070; kvend = 65533; length = 327680 >>>> 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 3 >>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 18874314; bufend = 23592906; bufvoid = 99614720 >>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 65533; kvend = 327677; length = 327680 >>>> 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 4 >>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 23592906; bufend = 28311480; bufvoid = 99614720 >>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 327677; kvend = 262140; length = 327680 >>>> 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 5 >>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 28311480; bufend = 33030054; bufvoid = 99614720 >>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 262140; kvend = 196603; length = 327680 >>>> 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 6 >>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 33030054; bufend = 37748628; bufvoid = 99614720 >>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 196603; kvend = 131066; length = 327680 >>>> 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 7 >>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling >>>> map output: record full = true >>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = >>>> 37748628; bufend = 42467202; bufvoid = 99614720 >>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = >>>> 131066; kvend = 65529; length = 327680 >>>> 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 8 >>>> 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting >>>> flush of map output >>>> 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished >>>> spill 9 >>>> 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 >>>> sorted segments >>>> 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the >>>> last merge-pass, with 10 segments left of total size: 50698660 bytes >>>> 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: >>>> Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process >>>> of commiting >>>> 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task >>>> 'attempt_201108051329_0060_m_000000_0' done. >>>> >>>> and the log for the reducer >>>> >>>> 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: >>>> Initializing JVM Metrics with processName=SHUFFLE, sessionId= >>>> 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: >>>> ShuffleRamManager: MemoryLimit=140699232, MaxSingleShuffleLimit=35174808 >>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging >>>> on-disk files >>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging in >>>> memory files >>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging >>>> on-disk files >>>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where 0 >>>> is already in progress >>>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling >>>> Map Completion Events >>>> 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts >>>> and0 dup hosts) >>>> 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs >>>> 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts >>>> and0 dup hosts) >>>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header: >>>> attempt_201108051329_0060_m_000000_0, compressed len: 50698646, >>>> decompressed len: 50698642 >>>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: >>>> Shuffling 50698642 bytes (50698646 raw bytes) into Local-FS from >>>> attempt_201108051329_0060_m_000000_0 >>>> 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read >>>> 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0 >>>> 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: >>>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging >>>> on-disk files >>>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: >>>> GetMapEventsThread exiting >>>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: >>>> getMapsEventsThread joined. >>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed >>>> ram manager >>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: >>>> Interleaved on-disk merge complete: 1 files left. >>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: >>>> In-memory merge complete: 0 files left. >>>> 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging >>>> 1 files, 50698646 bytes from disk >>>> 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging >>>> 0 segments, 0 bytes from memory into reduce >>>> 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 >>>> sorted segments >>>> 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the >>>> last merge-pass, with 1 segments left of total size: 50698642 bytes >>>> 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: >>>> Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process >>>> of commiting >>>> 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task >>>> 'attempt_201108051329_0060_r_000000_0' done. >>>> >>>> My code is similar to the word count example: >>>> >>>> public int run(String[] args) throws Exception { >>>> ... >>>> >>>> getConf().set(CONF_COLUMN_NAME, columnName); >>>> >>>> Job job4 = new Job(getConf(), "SalesRankWriter"); >>>> job4.setJarByClass(SalesRankLoader.class); >>>> job4.setMapperClass(RankUpdateMapper.class); >>>> job4.setReducerClass(RankUpdateReducer.class); >>>> job4.setMapOutputKeyClass(Text.class); >>>> job4.setMapOutputValueClass(IntWritable.class); >>>> job4.setOutputKeyClass(ByteBuffer.class); >>>> job4.setOutputValueClass(List.class); >>>> job4.setOutputFormatClass(ColumnFamilyOutputFormat.class); >>>> job4.setInputFormatClass(TextInputFormat.class); >>>> FileInputFormat.addInputPath(job4, new Path(prePath)); >>>> >>>> ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), >>>> KEYSPACE, columnFamily); >>>> ConfigHelper.setRpcPort(job4.getConfiguration(), "9260"); >>>> ConfigHelper.setInitialAddress(job4.getConfiguration(), >>>> "dnjsrcha01"); >>>> ConfigHelper.setPartitioner(job4.getConfiguration(), >>>> "org.apache.cassandra.dht.RandomPartitioner"); >>>> >>>> >>>> job4.waitForCompletion(true); >>>> ... >>>> } >>>> >>>> >>>> >>>> >>>> >>>> where the mapper and reducer are defined as: >>>> >>>> public static class RankUpdateMapper extends Mapper<LongWritable, >>>> Text, Text, IntWritable> { >>>> public void map(LongWritable key, Text value, Context context) >>>> throws IOException, InterruptedException { >>>> String line = value.toString(); >>>> StringTokenizer tokenizer = new StringTokenizer(line); >>>> String ean = tokenizer.nextToken(); >>>> int rank = Integer.parseInt(tokenizer.nextToken()); >>>> >>>> context.write(new Text(ean), new IntWritable(rank)); >>>> } >>>> } >>>> >>>> public static class RankUpdateReducer extends Reducer<Text, >>>> IntWritable, ByteBuffer, List<Mutation>> >>>> { >>>> private ByteBuffer outputKey; >>>> >>>> protected void setup(org.apache.hadoop.mapreduce.Reducer.Context >>>> context) throws IOException, InterruptedException >>>> { >>>> outputKey = >>>> ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); >>>> } >>>> >>>> public void reduce(Text key, Iterable<IntWritable> values, Context >>>> context) throws IOException, InterruptedException >>>> { >>>> context.write(outputKey, >>>> Collections.singletonList(getMutation(key, >>>> values.iterator().next().get()))); >>>> } >>>> >>>> private static Mutation getMutation(Text key, int value) >>>> { >>>> Column c = new Column(); >>>> c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), >>>> key.getLength()))); >>>> c.setValue(ByteBufferUtil.bytes(String.valueOf(value))); >>>> c.setTimestamp(System.currentTimeMillis() * 1000); >>>> >>>> Mutation m = new Mutation(); >>>> m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); >>>> m.column_or_supercolumn.setColumn(c); >>>> return m; >>>> } >>>> } >>>> >>>> Any thing wrong here? >>>> >>>> Thanks, >>>> >>>> John >>> >>> >> >> > >