Hi Ken, Do you mean that I can create a batch sink which writes to N files? That sounds viable, but since our data size is huge (billions of records & thousands of files), the performance may be unacceptable. I will check Blink and give it a try anyway.
Thank you, Qi > On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > Hi Qi, > > If I understand what you’re trying to do, then this sounds like a variation > of a bucketing sink. > > That typically uses a field value to create a directory path or a file name > (though the filename case is only viable when the field is also what’s used > to partition the data) > > But I don’t believe Flink has built-in support for that, in batch mode (see > BucketingSink > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> > for streaming). > > Maybe Blink has added that? Hoping someone who knows that codebase can chime > in here. > > Otherwise you’ll need to create a custom sink to implement the desired > behavior - though abusing a MapPartitionFunction > <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html> > would be easiest, I think. > > — Ken > > > >> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com >> <mailto:luoqi...@gmail.com>> wrote: >> >> Hi Ken, >> >> Thanks for your reply. I may not make myself clear: our problem is not about >> reading but rather writing. >> >> We need to write to N files based on key partitioning. We have to use >> setParallelism() to set the output partition/file number, but when the >> partition number is too large (~100K), the parallelism would be too high. Is >> there any other way to achieve this? >> >> Thanks, >> Qi >> >>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com >>> <mailto:kkrugler_li...@transpac.com>> wrote: >>> >>> Hi Qi, >>> >>> I’m guessing you’re calling createInput() for each input file. >>> >>> If so, then instead you want to do something like: >>> >>> Job job = Job.getInstance(); >>> >>> for each file… >>> FileInputFormat.addInputPath(job, new >>> org.apache.hadoop.fs.Path(file path)); >>> >>> env.createInput(HadoopInputs.createHadoopInput(…, job) >>> >>> Flink/Hadoop will take care of parallelizing the reads from the files, >>> given the parallelism that you’re specifying. >>> >>> — Ken >>> >>> >>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com >>>> <mailto:luoqi...@gmail.com>> wrote: >>>> >>>> Hi, >>>> >>>> We’re trying to distribute batch input data to (N) HDFS files partitioning >>>> by hash using DataSet API. What I’m doing is like: >>>> >>>> env.createInput(…) >>>> .partitionByHash(0) >>>> .setParallelism(N) >>>> .output(…) >>>> >>>> This works well for small number of files. But when we need to distribute >>>> to large number of files (say 100K), the parallelism becomes too large and >>>> we could not afford that many TMs. >>>> >>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the >>>> parallelism separately (using dynamic allocation). Is there anything >>>> similar in Flink or other way we can achieve similar result? Thank you! >>>> >>>> Qi >>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra >