I suspect Camus job has issue because other process ( another separate Map/Reduce Job) also write to same "time" (folders) bucket and it does not have this issue at all (so far) when reading from other dependent Hive job. This dependent Hive job only have issue with files created via camus job ( not always but intermittently and hive job fails with read error about EOF and work around for now is to remove these unclosed file from the folder and hive job succeeds ).
Thanks, Bhavesh On Mon, Mar 2, 2015 at 5:27 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > Actually, the error you sent shows that its trying to read a TEXT file > as if it was Seq. Thats why I suspected a misconfiguration of some > sort. > > Why do you suspect a race condition? > > On Mon, Mar 2, 2015 at 5:19 PM, Bhavesh Mistry > <mistry.p.bhav...@gmail.com> wrote: > > Hi Gwen, > > > > We are using MapR (Sorry no Cloudera) distribution. > > > > > > I am suspecting it is code issue. I am in-processor review the code > about > > MultiOutputFormat class. > > > > > https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35 > > > > I am suspecting that due to some concurrency, it is replacing older > writer > > with new one (old writer does not close). The file it crates is usually > > small,and has very small content for problematic files (EOF file). > > > > > https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91 > > > > > > Based on above code, Do you think there is likelihood that output file > may > > be unclosed file ? Also, my plan is to add isClose() api to each writer, > > and if you have time, you can quickly review them (suggest or your > > feedback) about unclosed files. By the way, we are on Hadoop 1.0.3 API ( > > so I was thinking about > > > http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close() > > and make sure within the close we close all the File Writers.. let me > know > > if this is good or not do final clean-up). > > > > > > public interface RecordWriterWithCloseStatus<K, V> extends > RecordWriter<K, > > V>{ > > > > /** > > > > * Give Ability to check if close has been called on the writer or File > has > > been closed on not.. > > > > * @return > > > > */ > > > > public boolean isClose(); > > > > } > > > > And each of the writer will have ability check for clean at all the time: > > > > eg: > > > > {code} > > > > return new RecordWriterWithStatus<IEtlKey, CamusWrapper>() { > > > > private volatile boolean close; > > > > > > @Override > > > > public void write(IEtlKey key, CamusWrapper data) throws > > IOException, InterruptedException { > > > > > > > > > > /** > > > > * What if file is closed ? Should we create a new one > here..? > > > > */ > > > > > > > > // Use the timestamp from the EtlKey as the key for this > > record. > > > > // TODO: Is there a better key to use here? > > > > writer.append(new LongWritable(key.getTime()), new Text( > > record)); > > > > } > > > > > > @Override > > > > public void close(TaskAttemptContext context) throws > > IOException, InterruptedException { > > > > writer.close(); > > > > close = true; > > > > } > > > > > > > > protected void finalize() throws Throwable { > > > > if(this.close){ > > > > log.error("This file was not closed so try to close during > the > > JVM finalize.."); > > > > try{ > > > > writer.close(); > > > > }catch(Throwable th){ > > > > log.error("File Close erorr during finalize()"); > > > > } > > > > } > > > > super.finalize(); > > > > } > > > > @Override > > > > public boolean isClose() { > > > > return close; > > > > } > > > > @Override > > > > public boolean isClose() { > > > > return close; > > > > } > > > > }; > > > > > > Thanks for your quick input and response. > > > > > > Thanks, > > > > Bhavesh > > > > On Mon, Mar 2, 2015 at 4:05 PM, Gwen Shapira <gshap...@cloudera.com> > wrote: > > > >> Do you have the command you used to run Camus? and the config files? > >> > >> Also, I noticed your file is on maprfs - you may want to check with > >> your vendor... I doubt Camus was extensively tested on that particular > >> FS. > >> > >> On Mon, Mar 2, 2015 at 3:59 PM, Bhavesh Mistry > >> <mistry.p.bhav...@gmail.com> wrote: > >> > Hi Kakfa User Team, > >> > > >> > I have been encountering two issues with Camus Kafka ETL Job: > >> > > >> > 1) End Of File (unclosed files) > >> > > >> > 2) Not SequenceFile Error > >> > The details of issues can be found at > >> > https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc. > >> > > >> > If you guys have faced similar issue, please let me know how to go > about > >> > solving them. > >> > > >> > Thanks, > >> > > >> > Bhavesh > >> >