Hi Jason, Yes, I write the files inside of the mapPartition function. Note that you can get multiple key groups inside of one partition, so you have to manage your own map from the key group to the writer.
The Flink DAG ends with a DiscardingSink, after the mapPartition. And no, we didn’t notice any specific performance hit with this approach. Though our workflow was much more complex, so performance was bounded by upstream joins. — Ken > On Jul 13, 2021, at 10:53 AM, Jason Liu <jasonli...@ucla.edu> wrote: > > Hey Ken, > > Thanks! This is really helpful. Just to clarify, when you said write a > custom mapPartition that writes to files, did you actually write the file > inside the mapPartition function itself? So the Flink DAG ends at > mapPartition? Did you notice any performance issues as a result of this? > > Thanks again, > Jason > > On Fri, Jul 9, 2021 at 1:39 PM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: > FWIW I had to do something similar in the past. My solution was to… > > 1. Create a custom reader that added the source directory to the input data > (so I had a Tuple2<source dir name, actual data> > 2. Create a job that reads from all source directories, using > HadoopInputFormat for text > 3. Constrain the parallelism of this initial part of the job, to avoid > overwhelming downloads from S3. > 4. Do a partition on the source directory > 5. Write a custom mapPartition function that opens/writes to output files > that are created with names based on the source directory. > > — Ken > >> On Jul 8, 2021, at 3:19 PM, Jason Liu <jasonli...@ucla.edu >> <mailto:jasonli...@ucla.edu>> wrote: >> >> Hi all, >> >> We currently have a use case of running a given dataset API job for a >> given S3 directory to dedup data and output to a new directory. We need to >> run this job for roughly ~1000 S3 folders. I attempted to set up the Flink >> executions so it runs sequentially like this: >> >> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); >> >> Configuration parameters = new Configuration(); >> parameters.setBoolean("recursive.file.enumeration", true); >> >> for (final String inputDirectory : directoryList) { >> String inputPath = inputDirectory; >> String outputPath = getOutputPath(inputPath); >> >> log.warn("using input path [{}] and output path [{}]", inputPath, >> outputPath); >> >> DataSet<String> lines = >> env.readTextFile(inputPath).withParameters(parameters); >> DataSet<String> deduped = lines.distinct(new GetKey()); >> deduped.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE); >> } >> env.execute(); >> However, when I submit this job to the cluster, it generates a graph like >> this >> <image.png> >> And it seems Flink is running them in parallel. Is there a way to tell Flink >> to run it sequentially? I tried moving the execution environment inside the >> loop but it seems like it only runs the job on the first directory. I'm >> running this on AWS Kinesis Data Analytics, so it's a bit hard for me to >> submit new jobs. >> >> Wondering if there's any way I can accomplish this? >> >> Thanks, >> Jason >> > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch