Actually, the error you sent shows that its trying to read a TEXT file
as if it was Seq. Thats why I suspected a misconfiguration of some
sort.

Why do you suspect a race condition?

On Mon, Mar 2, 2015 at 5:19 PM, Bhavesh Mistry
<mistry.p.bhav...@gmail.com> wrote:
> Hi Gwen,
>
> We are using MapR (Sorry no Cloudera) distribution.
>
>
> I am suspecting it is code issue.  I am in-processor review the code about
> MultiOutputFormat class.
>
> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35
>
> I am suspecting that due to some concurrency, it is replacing older writer
> with new one (old writer does not close).   The file it crates is usually
> small,and has very small content for problematic files (EOF file).
>
> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91
>
>
> Based on above code, Do you think there is likelihood that output file may
> be unclosed file ?  Also, my plan is to add isClose() api to each writer,
> and if you have time, you can quickly review them (suggest or your
> feedback) about unclosed files.  By the way, we are on Hadoop 1.0.3 API (
> so I was thinking about
> http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close()
> and make sure within the close we close all the File Writers.. let me know
> if this is good or not do final clean-up).
>
>
> public interface RecordWriterWithCloseStatus<K, V> extends RecordWriter<K,
> V>{
>
> /**
>
>  * Give Ability to check if close has been called on the writer or File has
> been closed on not..
>
>  * @return
>
>  */
>
> public boolean isClose();
>
> }
>
> And each of the writer will have ability check for clean at all the time:
>
> eg:
>
> {code}
>
>       return new RecordWriterWithStatus<IEtlKey, CamusWrapper>() {
>
>             private volatile boolean close;
>
>
>             @Override
>
>             public void write(IEtlKey key, CamusWrapper data) throws
> IOException, InterruptedException {
>
>
>
>
>             /**
>
>              * What if file is closed ?  Should we create a new one here..?
>
>              */
>
>
>
>                 // Use the timestamp from the EtlKey as the key for this
> record.
>
>                 // TODO: Is there a better key to use here?
>
>                 writer.append(new LongWritable(key.getTime()), new Text(
> record));
>
>             }
>
>
>             @Override
>
>             public void close(TaskAttemptContext context) throws
> IOException, InterruptedException {
>
>                 writer.close();
>
>                 close = true;
>
>             }
>
>
>
>             protected void finalize() throws Throwable {
>
>                 if(this.close){
>
>              log.error("This file was not closed so try to close during the
> JVM finalize..");
>
>              try{
>
>              writer.close();
>
>              }catch(Throwable th){
>
>              log.error("File Close erorr during finalize()");
>
>              }
>
>             }
>
>             super.finalize();
>
>             }
>
>             @Override
>
>             public boolean isClose() {
>
>   return close;
>
> }
>
>             @Override
>
>             public boolean isClose() {
>
>   return close;
>
>  }
>
>         };
>
>
> Thanks for your quick input and response.
>
>
> Thanks,
>
> Bhavesh
>
> On Mon, Mar 2, 2015 at 4:05 PM, Gwen Shapira <gshap...@cloudera.com> wrote:
>
>> Do you have the command you used to run Camus? and the config files?
>>
>> Also, I noticed your file is on maprfs - you may want to check with
>> your vendor... I doubt Camus was extensively tested on that particular
>> FS.
>>
>> On Mon, Mar 2, 2015 at 3:59 PM, Bhavesh Mistry
>> <mistry.p.bhav...@gmail.com> wrote:
>> > Hi Kakfa User Team,
>> >
>> > I have been encountering two issues with Camus Kafka ETL Job:
>> >
>> > 1) End Of File (unclosed files)
>> >
>> > 2) Not SequenceFile Error
>> > The details of issues can be found at
>> > https://groups.google.com/forum/#!topic/camus_etl/RHS3ASy7Eqc.
>> >
>> > If you guys have faced similar issue, please let me know how to go about
>> > solving them.
>> >
>> > Thanks,
>> >
>> > Bhavesh
>>

Reply via email to