Hi Robert,

I haven’t tried yet with 1.11, on my list.

I’ll be spending time on this tomorrow, so hopefully more results.

As for setting the algorithm version 2, I do it in code like this:

        Job job = Job.getInstance();
        job.getConfiguration().set("io.serializations", 
"cascading.tuple.hadoop.TupleSerialization");
        job.setOutputKeyClass(Tuple.class);
        job.setOutputValueClass(Tuple.class);

        // So that the FileOutputCommitter used by HadoopOutputFormat will put 
the resulting file(s)
        // at the top level of the directory, so Flink's rename logic will work 
properly, without
        // needing to have FinalizeOnMaster support.
        
job.getConfiguration().setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
 2);
        
        SequenceFileOutputFormat.setOutputPath(job, new Path(_parentDir, 
bucket));

        HadoopOutputFormat<Tuple, Tuple> result = new HadoopOutputFormat<Tuple, 
Tuple>(
                new SequenceFileOutputFormat<Tuple, Tuple>(), job);

See also https://issues.apache.org/jira/browse/MAPREDUCE-4815 
<https://issues.apache.org/jira/browse/MAPREDUCE-4815>

— Ken

> On Sep 2, 2020, at 11:43 PM, Robert Metzger <rmetz...@apache.org> wrote:
> 
> Hi Ken,
> 
> sorry for the late reply. This could be a bug in Flink. Does the issue also
> occur on Flink 1.11?
> Have you set a breakpoint in the HadoopOutputFormat.finalizeGlobal() when
> running locally to validate that this method doesn't get called?
> 
> What do you mean by "algorithm version 2"? Where can you set this? (Sorry
> for the question, I'm not an expert with Hadoop's FileOutputCommitter)
> 
> Note to others: There's a related discussion here:
> https://issues.apache.org/jira/browse/FLINK-19069
> 
> Best,
> Robert
> 
> 
> On Wed, Aug 26, 2020 at 1:10 AM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
> 
>> Hi devs,
>> 
>> In HadoopOutputFormat.close(), I see code that is trying to rename
>> <outputPath>/tmp-r-00001 to be <outputPath>/1
>> 
>> But when I run my Flink 1.9.2 code using a local MiniCluster, the actual
>> location of the tmp-r-00001 file is:
>> 
>> <outputPath>/_temporary/0/task__0000_r_000001/tmp-r-00001
>> 
>> I think this is because the default behavior of Hadoop’s
>> FileOutputCommitter (with algorithm == 1) is to put files in task-specific
>> sub-dirs.
>> 
>> It’s depending on a post-completion “merge paths” action to be taken by
>> what is (for Hadoop) the Application Master.
>> 
>> I assume that when running on a real cluster, the
>> HadoopOutputFormat.finalizeGlobal() method’s call to commitJob() would do
>> this, but it doesn’t seem to be happening when I run locally.
>> 
>> If I set the algorithm version to 2, then “merge paths” is handled by
>> FileOutputCommitter immediately, and the HadoopOutputFormat code finds
>> files in the expected location.
>> 
>> Wondering if Flink should always be using version 2 of the algorithm, as
>> that’s more performant when there are a lot of results (which is why it was
>> added).
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>> 
>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to