Hi Gwen, The root cause of all io related problems seems to be file rename that Camus does and underlying Hadoop MapR FS.
We are copying files from user volume to a day volume (rename does copy) when mapper commits file to FS. Please refer to http://answers.mapr.com/questions/162562/volume-issues-end-of-file-javaioioexception-file-i.html for more info. We will have to patch Camus to copy to tmp directory then move to final destination as work around for now to make rename or file rename a more reliable. Thanks, Bhavesh On Monday, March 2, 2015, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > > I suspect Camus job has issue because other process ( another separate > Map/Reduce Job) also write to same "time" (folders) bucket and it does not > have this issue at all (so far) when reading from other dependent Hive > job. This dependent Hive job only have issue with files created via camus > job ( not always but intermittently and hive job fails with read error > about EOF and work around for now is to remove these unclosed file from the > folder and hive job succeeds ). > > > > Thanks, > > Bhavesh > > On Mon, Mar 2, 2015 at 5:27 PM, Gwen Shapira <gshap...@cloudera.com > <javascript:_e(%7B%7D,'cvml','gshap...@cloudera.com');>> wrote: > >> Actually, the error you sent shows that its trying to read a TEXT file >> as if it was Seq. Thats why I suspected a misconfiguration of some >> sort. >> >> Why do you suspect a race condition? >> >> On Mon, Mar 2, 2015 at 5:19 PM, Bhavesh Mistry >> <mistry.p.bhav...@gmail.com >> <javascript:_e(%7B%7D,'cvml','mistry.p.bhav...@gmail.com');>> wrote: >> > Hi Gwen, >> > >> > We are using MapR (Sorry no Cloudera) distribution. >> > >> > >> > I am suspecting it is code issue. I am in-processor review the code >> about >> > MultiOutputFormat class. >> > >> > >> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35 >> > >> > I am suspecting that due to some concurrency, it is replacing older >> writer >> > with new one (old writer does not close). The file it crates is >> usually >> > small,and has very small content for problematic files (EOF file). >> > >> > >> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91 >> > >> > >> > Based on above code, Do you think there is likelihood that output file >> may >> > be unclosed file ? Also, my plan is to add isClose() api to each >> writer, >> > and if you have time, you can quickly review them (suggest or your >> > feedback) about unclosed files. By the way, we are on Hadoop 1.0.3 API >> ( >> > so I was thinking about >> > >> http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close() >> > and make sure within the close we close all the File Writers.. let me >> know >> > if this is good or not do final clean-up). >> > >> > >> > public interface RecordWriterWithCloseStatus<K, V> extends >> RecordWriter<K, >> > V>{ >> > >> > /** >> > >> > * Give Ability to check if close has been called on the writer or File >> has >> > been closed on not.. >> > >> > * @return >> > >> > */ >> > >> > public boolean isClose(); >> > >> > } >> > >> > And each of the writer will have ability check for clean at all the >> time: >> > >> > eg: >> > >> > {code} >> > >> > return new RecordWriterWithStatus<IEtlKey, CamusWrapper>() { >> > >> > private volatile boolean close; >> > >> > >> > @Override >> > >> > public void write(IEtlKey key, CamusWrapper data) throws >> > IOException, InterruptedException { >> > >> > >> > >> > >> > /** >> > >> > * What if file is closed ? Should we create a new one >> here..? >> > >> > */ >> > >> > >> > >> > // Use the timestamp from the EtlKey as the key for this >> > record. >> > >> > // TODO: Is there a better key to use here? >> > >> > writer.append(new LongWritable(key.getTime()), new Text( >> > record)); >> > >> > } >> > >> > >> > @Override >> > >> > public void close(TaskAttemptContext context) throws >> > IOException, InterruptedException { >> > >> > writer.close(); >> > >> > close = true; >> > >> > } >> > >> > >> > >> > protected void finalize() throws Throwable { >> > >> > if(this.close){ >> > >> > log.error("This file was not closed so try to close during >> the >> > JVM finalize.."); >> > >> > try{ >> > >> > writer.close(); >> > >> > }catch(Throwable th){ >> > >> > log.error("File Close erorr during finalize()"); >> > >> > } >> > >> > } >> > >> > super.finalize(); >> > >> > } >> > >> > @Override >> > >> > public boolean isClose() { >> > >> > return close; >> > >> > } >> > >> > @Override >> > >> > public boolean isClose() { >> > >> > return close; >> > >> > } >> > >> > }; >> > >> > >> > Thanks for your quick input and response. >> > >> > >> > Thanks, >> > >> > Bhavesh >> > >> > On Mon, Mar 2, 2015 at 4:05 PM, Gwen Shapira <gshap...@cloudera.com >> <javascript:_e(%7B%7D,'cvml','gshap...@cloudera.com');>> wrote: >> > >> >> Do you have the command you used to run Camus? and the config files? >> >> >> >> Also, I noticed your file is on maprfs - you may want to check with >> >> your vendor... I doubt Camus was extensively tested on that particular >> >> FS. >> >> >> >> On Mon, Mar 2, 2015 at 3:59 PM, Bhavesh Mistry >> >> <mistry.p.bhav...@gmail.com >> <javascript:_e(%7B%7D,'cvml','mistry.p.bhav...@gmail.com');>> wrote: >> >> > Hi Kakfa User Team, >> >> > >> >> > I have been encountering two issues with Camus Kafka ETL Job: >> >> > >> >> > 1) End Of File (unclosed files) >> >> > >> >> > 2) Not SequenceFile Error >> >> > The details of issues can be found at >> >> > https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc. >> >> > >> >> > If you guys have faced similar issue, please let me know how to go >> about >> >> > solving them. >> >> > >> >> > Thanks, >> >> > >> >> > Bhavesh >> >> >> > >