Hi Qi, If I understand what you’re trying to do, then this sounds like a variation of a bucketing sink.
That typically uses a field value to create a directory path or a file name (though the filename case is only viable when the field is also what’s used to partition the data) But I don’t believe Flink has built-in support for that, in batch mode (see BucketingSink <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> for streaming). Maybe Blink has added that? Hoping someone who knows that codebase can chime in here. Otherwise you’ll need to create a custom sink to implement the desired behavior - though abusing a MapPartitionFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html> would be easiest, I think. — Ken > On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com> wrote: > > Hi Ken, > > Thanks for your reply. I may not make myself clear: our problem is not about > reading but rather writing. > > We need to write to N files based on key partitioning. We have to use > setParallelism() to set the output partition/file number, but when the > partition number is too large (~100K), the parallelism would be too high. Is > there any other way to achieve this? > > Thanks, > Qi > >> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> >> Hi Qi, >> >> I’m guessing you’re calling createInput() for each input file. >> >> If so, then instead you want to do something like: >> >> Job job = Job.getInstance(); >> >> for each file… >> FileInputFormat.addInputPath(job, new >> org.apache.hadoop.fs.Path(file path)); >> >> env.createInput(HadoopInputs.createHadoopInput(…, job) >> >> Flink/Hadoop will take care of parallelizing the reads from the files, given >> the parallelism that you’re specifying. >> >> — Ken >> >> >>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com >>> <mailto:luoqi...@gmail.com>> wrote: >>> >>> Hi, >>> >>> We’re trying to distribute batch input data to (N) HDFS files partitioning >>> by hash using DataSet API. What I’m doing is like: >>> >>> env.createInput(…) >>> .partitionByHash(0) >>> .setParallelism(N) >>> .output(…) >>> >>> This works well for small number of files. But when we need to distribute >>> to large number of files (say 100K), the parallelism becomes too large and >>> we could not afford that many TMs. >>> >>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the >>> parallelism separately (using dynamic allocation). Is there anything >>> similar in Flink or other way we can achieve similar result? Thank you! >>> >>> Qi >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra