FWIW I had to do something similar in the past. My solution was to… 1. Create a custom reader that added the source directory to the input data (so I had a Tuple2<source dir name, actual data> 2. Create a job that reads from all source directories, using HadoopInputFormat for text 3. Constrain the parallelism of this initial part of the job, to avoid overwhelming downloads from S3. 4. Do a partition on the source directory 5. Write a custom mapPartition function that opens/writes to output files that are created with names based on the source directory.
— Ken > On Jul 8, 2021, at 3:19 PM, Jason Liu <jasonli...@ucla.edu> wrote: > > Hi all, > > We currently have a use case of running a given dataset API job for a > given S3 directory to dedup data and output to a new directory. We need to > run this job for roughly ~1000 S3 folders. I attempted to set up the Flink > executions so it runs sequentially like this: > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > Configuration parameters = new Configuration(); > parameters.setBoolean("recursive.file.enumeration", true); > > for (final String inputDirectory : directoryList) { > String inputPath = inputDirectory; > String outputPath = getOutputPath(inputPath); > > log.warn("using input path [{}] and output path [{}]", inputPath, > outputPath); > > DataSet<String> lines = > env.readTextFile(inputPath).withParameters(parameters); > DataSet<String> deduped = lines.distinct(new GetKey()); > deduped.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE); > } > env.execute(); > However, when I submit this job to the cluster, it generates a graph like > this > <image.png> > And it seems Flink is running them in parallel. Is there a way to tell Flink > to run it sequentially? I tried moving the execution environment inside the > loop but it seems like it only runs the job on the first directory. I'm > running this on AWS Kinesis Data Analytics, so it's a bit hard for me to > submit new jobs. > > Wondering if there's any way I can accomplish this? > > Thanks, > Jason > -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch