Hi Robert,
I haven’t tried yet with 1.11, on my list.
I’ll be spending time on this tomorrow, so hopefully more results.
As for setting the algorithm version 2, I do it in code like this:
Job job = Job.getInstance();
job.getConfiguration().set("io.serializations",
"cascading.tuple.hadoop.TupleSerialization");
job.setOutputKeyClass(Tuple.class);
job.setOutputValueClass(Tuple.class);
// So that the FileOutputCommitter used by HadoopOutputFormat will put
the resulting file(s)
// at the top level of the directory, so Flink's rename logic will work
properly, without
// needing to have FinalizeOnMaster support.
job.getConfiguration().setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
2);
SequenceFileOutputFormat.setOutputPath(job, new Path(_parentDir,
bucket));
HadoopOutputFormat<Tuple, Tuple> result = new HadoopOutputFormat<Tuple,
Tuple>(
new SequenceFileOutputFormat<Tuple, Tuple>(), job);
See also https://issues.apache.org/jira/browse/MAPREDUCE-4815
<https://issues.apache.org/jira/browse/MAPREDUCE-4815>
— Ken
> On Sep 2, 2020, at 11:43 PM, Robert Metzger <[email protected]> wrote:
>
> Hi Ken,
>
> sorry for the late reply. This could be a bug in Flink. Does the issue also
> occur on Flink 1.11?
> Have you set a breakpoint in the HadoopOutputFormat.finalizeGlobal() when
> running locally to validate that this method doesn't get called?
>
> What do you mean by "algorithm version 2"? Where can you set this? (Sorry
> for the question, I'm not an expert with Hadoop's FileOutputCommitter)
>
> Note to others: There's a related discussion here:
> https://issues.apache.org/jira/browse/FLINK-19069
>
> Best,
> Robert
>
>
> On Wed, Aug 26, 2020 at 1:10 AM Ken Krugler <[email protected]>
> wrote:
>
>> Hi devs,
>>
>> In HadoopOutputFormat.close(), I see code that is trying to rename
>> <outputPath>/tmp-r-00001 to be <outputPath>/1
>>
>> But when I run my Flink 1.9.2 code using a local MiniCluster, the actual
>> location of the tmp-r-00001 file is:
>>
>> <outputPath>/_temporary/0/task__0000_r_000001/tmp-r-00001
>>
>> I think this is because the default behavior of Hadoop’s
>> FileOutputCommitter (with algorithm == 1) is to put files in task-specific
>> sub-dirs.
>>
>> It’s depending on a post-completion “merge paths” action to be taken by
>> what is (for Hadoop) the Application Master.
>>
>> I assume that when running on a real cluster, the
>> HadoopOutputFormat.finalizeGlobal() method’s call to commitJob() would do
>> this, but it doesn’t seem to be happening when I run locally.
>>
>> If I set the algorithm version to 2, then “merge paths” is handled by
>> FileOutputCommitter immediately, and the HadoopOutputFormat code finds
>> files in the expected location.
>>
>> Wondering if Flink should always be using version 2 of the algorithm, as
>> that’s more performant when there are a lot of results (which is why it was
>> added).
>>
>> Thanks,
>>
>> — Ken
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>>
>>
--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr