Hi Robert, I haven’t tried yet with 1.11, on my list.
I’ll be spending time on this tomorrow, so hopefully more results. As for setting the algorithm version 2, I do it in code like this: Job job = Job.getInstance(); job.getConfiguration().set("io.serializations", "cascading.tuple.hadoop.TupleSerialization"); job.setOutputKeyClass(Tuple.class); job.setOutputValueClass(Tuple.class); // So that the FileOutputCommitter used by HadoopOutputFormat will put the resulting file(s) // at the top level of the directory, so Flink's rename logic will work properly, without // needing to have FinalizeOnMaster support. job.getConfiguration().setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 2); SequenceFileOutputFormat.setOutputPath(job, new Path(_parentDir, bucket)); HadoopOutputFormat<Tuple, Tuple> result = new HadoopOutputFormat<Tuple, Tuple>( new SequenceFileOutputFormat<Tuple, Tuple>(), job); See also https://issues.apache.org/jira/browse/MAPREDUCE-4815 <https://issues.apache.org/jira/browse/MAPREDUCE-4815> — Ken > On Sep 2, 2020, at 11:43 PM, Robert Metzger <rmetz...@apache.org> wrote: > > Hi Ken, > > sorry for the late reply. This could be a bug in Flink. Does the issue also > occur on Flink 1.11? > Have you set a breakpoint in the HadoopOutputFormat.finalizeGlobal() when > running locally to validate that this method doesn't get called? > > What do you mean by "algorithm version 2"? Where can you set this? (Sorry > for the question, I'm not an expert with Hadoop's FileOutputCommitter) > > Note to others: There's a related discussion here: > https://issues.apache.org/jira/browse/FLINK-19069 > > Best, > Robert > > > On Wed, Aug 26, 2020 at 1:10 AM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> Hi devs, >> >> In HadoopOutputFormat.close(), I see code that is trying to rename >> <outputPath>/tmp-r-00001 to be <outputPath>/1 >> >> But when I run my Flink 1.9.2 code using a local MiniCluster, the actual >> location of the tmp-r-00001 file is: >> >> <outputPath>/_temporary/0/task__0000_r_000001/tmp-r-00001 >> >> I think this is because the default behavior of Hadoop’s >> FileOutputCommitter (with algorithm == 1) is to put files in task-specific >> sub-dirs. >> >> It’s depending on a post-completion “merge paths” action to be taken by >> what is (for Hadoop) the Application Master. >> >> I assume that when running on a real cluster, the >> HadoopOutputFormat.finalizeGlobal() method’s call to commitJob() would do >> this, but it doesn’t seem to be happening when I run locally. >> >> If I set the algorithm version to 2, then “merge paths” is handled by >> FileOutputCommitter immediately, and the HadoopOutputFormat code finds >> files in the expected location. >> >> Wondering if Flink should always be using version 2 of the algorithm, as >> that’s more performant when there are a lot of results (which is why it was >> added). >> >> Thanks, >> >> — Ken >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> custom big data solutions & training >> Hadoop, Cascading, Cassandra & Solr >> >> -------------------------- Ken Krugler http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr