Hi Gwen, We are using MapR (Sorry no Cloudera) distribution.
I am suspecting it is code issue. I am in-processor review the code about MultiOutputFormat class. https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35 I am suspecting that due to some concurrency, it is replacing older writer with new one (old writer does not close). The file it crates is usually small,and has very small content for problematic files (EOF file). https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91 Based on above code, Do you think there is likelihood that output file may be unclosed file ? Also, my plan is to add isClose() api to each writer, and if you have time, you can quickly review them (suggest or your feedback) about unclosed files. By the way, we are on Hadoop 1.0.3 API ( so I was thinking about http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close() and make sure within the close we close all the File Writers.. let me know if this is good or not do final clean-up). public interface RecordWriterWithCloseStatus<K, V> extends RecordWriter<K, V>{ /** * Give Ability to check if close has been called on the writer or File has been closed on not.. * @return */ public boolean isClose(); } And each of the writer will have ability check for clean at all the time: eg: {code} return new RecordWriterWithStatus<IEtlKey, CamusWrapper>() { private volatile boolean close; @Override public void write(IEtlKey key, CamusWrapper data) throws IOException, InterruptedException { /** * What if file is closed ? Should we create a new one here..? */ // Use the timestamp from the EtlKey as the key for this record. // TODO: Is there a better key to use here? writer.append(new LongWritable(key.getTime()), new Text( record)); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { writer.close(); close = true; } protected void finalize() throws Throwable { if(this.close){ log.error("This file was not closed so try to close during the JVM finalize.."); try{ writer.close(); }catch(Throwable th){ log.error("File Close erorr during finalize()"); } } super.finalize(); } @Override public boolean isClose() { return close; } @Override public boolean isClose() { return close; } }; Thanks for your quick input and response. Thanks, Bhavesh On Mon, Mar 2, 2015 at 4:05 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > Do you have the command you used to run Camus? and the config files? > > Also, I noticed your file is on maprfs - you may want to check with > your vendor... I doubt Camus was extensively tested on that particular > FS. > > On Mon, Mar 2, 2015 at 3:59 PM, Bhavesh Mistry > <mistry.p.bhav...@gmail.com> wrote: > > Hi Kakfa User Team, > > > > I have been encountering two issues with Camus Kafka ETL Job: > > > > 1) End Of File (unclosed files) > > > > 2) Not SequenceFile Error > > The details of issues can be found at > > https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc. > > > > If you guys have faced similar issue, please let me know how to go about > > solving them. > > > > Thanks, > > > > Bhavesh >