Hi Ken, Thanks for your reply. I may not make myself clear: our problem is not about reading but rather writing.
We need to write to N files based on key partitioning. We have to use setParallelism() to set the output partition/file number, but when the partition number is too large (~100K), the parallelism would be too high. Is there any other way to achieve this? Thanks, Qi > On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > Hi Qi, > > I’m guessing you’re calling createInput() for each input file. > > If so, then instead you want to do something like: > > Job job = Job.getInstance(); > > for each file… > FileInputFormat.addInputPath(job, new > org.apache.hadoop.fs.Path(file path)); > > env.createInput(HadoopInputs.createHadoopInput(…, job) > > Flink/Hadoop will take care of parallelizing the reads from the files, given > the parallelism that you’re specifying. > > — Ken > > >> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com >> <mailto:luoqi...@gmail.com>> wrote: >> >> Hi, >> >> We’re trying to distribute batch input data to (N) HDFS files partitioning >> by hash using DataSet API. What I’m doing is like: >> >> env.createInput(…) >> .partitionByHash(0) >> .setParallelism(N) >> .output(…) >> >> This works well for small number of files. But when we need to distribute to >> large number of files (say 100K), the parallelism becomes too large and we >> could not afford that many TMs. >> >> In spark we can write something like ‘rdd.partitionBy(N)’ and control the >> parallelism separately (using dynamic allocation). Is there anything similar >> in Flink or other way we can achieve similar result? Thank you! >> >> Qi > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra >