Actually, the error you sent shows that its trying to read a TEXT file as if it was Seq. Thats why I suspected a misconfiguration of some sort.
Why do you suspect a race condition? On Mon, Mar 2, 2015 at 5:19 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Hi Gwen, > > We are using MapR (Sorry no Cloudera) distribution. > > > I am suspecting it is code issue. I am in-processor review the code about > MultiOutputFormat class. > > https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35 > > I am suspecting that due to some concurrency, it is replacing older writer > with new one (old writer does not close). The file it crates is usually > small,and has very small content for problematic files (EOF file). > > https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91 > > > Based on above code, Do you think there is likelihood that output file may > be unclosed file ? Also, my plan is to add isClose() api to each writer, > and if you have time, you can quickly review them (suggest or your > feedback) about unclosed files. By the way, we are on Hadoop 1.0.3 API ( > so I was thinking about > http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close() > and make sure within the close we close all the File Writers.. let me know > if this is good or not do final clean-up). > > > public interface RecordWriterWithCloseStatus<K, V> extends RecordWriter<K, > V>{ > > /** > > * Give Ability to check if close has been called on the writer or File has > been closed on not.. > > * @return > > */ > > public boolean isClose(); > > } > > And each of the writer will have ability check for clean at all the time: > > eg: > > {code} > > return new RecordWriterWithStatus<IEtlKey, CamusWrapper>() { > > private volatile boolean close; > > > @Override > > public void write(IEtlKey key, CamusWrapper data) throws > IOException, InterruptedException { > > > > > /** > > * What if file is closed ? Should we create a new one here..? > > */ > > > > // Use the timestamp from the EtlKey as the key for this > record. > > // TODO: Is there a better key to use here? > > writer.append(new LongWritable(key.getTime()), new Text( > record)); > > } > > > @Override > > public void close(TaskAttemptContext context) throws > IOException, InterruptedException { > > writer.close(); > > close = true; > > } > > > > protected void finalize() throws Throwable { > > if(this.close){ > > log.error("This file was not closed so try to close during the > JVM finalize.."); > > try{ > > writer.close(); > > }catch(Throwable th){ > > log.error("File Close erorr during finalize()"); > > } > > } > > super.finalize(); > > } > > @Override > > public boolean isClose() { > > return close; > > } > > @Override > > public boolean isClose() { > > return close; > > } > > }; > > > Thanks for your quick input and response. > > > Thanks, > > Bhavesh > > On Mon, Mar 2, 2015 at 4:05 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > >> Do you have the command you used to run Camus? and the config files? >> >> Also, I noticed your file is on maprfs - you may want to check with >> your vendor... I doubt Camus was extensively tested on that particular >> FS. >> >> On Mon, Mar 2, 2015 at 3:59 PM, Bhavesh Mistry >> <mistry.p.bhav...@gmail.com> wrote: >> > Hi Kakfa User Team, >> > >> > I have been encountering two issues with Camus Kafka ETL Job: >> > >> > 1) End Of File (unclosed files) >> > >> > 2) Not SequenceFile Error >> > The details of issues can be found at >> > https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc. >> > >> > If you guys have faced similar issue, please let me know how to go about >> > solving them. >> > >> > Thanks, >> > >> > Bhavesh >>