Re: Set partition number of Flink DataSet

2019-03-21 Thread qi luo
Thank you Fabian! I will check these issues. > On Mar 20, 2019, at 4:25 PM, Fabian Hueske wrote: > > Hi, > > I'm sorry but I'm only familiar with the high-level design but not with the > implementation details and concrete roadmap for the involved components. > I think that FLINK-10288 [1] and

Re: Set partition number of Flink DataSet

2019-03-20 Thread Fabian Hueske
Hi, I'm sorry but I'm only familiar with the high-level design but not with the implementation details and concrete roadmap for the involved components. I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition handling. Best, Fabian [1] https://issues.apache.org/jira/browse/FLIN

Re: Set partition number of Flink DataSet

2019-03-15 Thread qi luo
Hi Fabian, I understand this is a by-design behavior, since Flink is firstly built for streaming. Supporting batch shuffle and custom partition number in Flink may be compelling in batch processing. Could you help explain a bit more on which works are needed to be done, so Flink can support c

Re: Set partition number of Flink DataSet

2019-03-15 Thread Fabian Hueske
Hi, Flink works a bit differently than Spark. By default, Flink uses pipelined shuffles which push results of the sender immediately to the receivers (btw. this is one of the building blocks for stream processing). However, pipelined shuffles require that all receivers are online. Hence, there num

Re: Set partition number of Flink DataSet

2019-03-14 Thread qi luo
Hi Ken, That looks awesome! I’ve implemented something similar to your bucketing sink, but using multiple internal writers rather than multiple internal output. Besides this, I’m also curious whether Flink can achieve this like Spark: allow user to specify partition number in partitionBy() meth

Re: Set partition number of Flink DataSet

2019-03-14 Thread Ken Krugler
Hi Qi, See https://github.com/ScaleUnlimited/flink-utils/ , for a rough but working version of a bucketing sink. — Ken > On Mar 13, 2019, at 7:46 PM, qi luo wrote: > > Hi Ken, > > Agree. I will try partitonBy() to reducer the number of parall

Re: Set partition number of Flink DataSet

2019-03-13 Thread qi luo
Hi Ken, Agree. I will try partitonBy() to reducer the number of parallel sinks, and may also try sortPartition() so each sink could write files one by one. Looking forward to your solution. :) Thanks, Qi > On Mar 14, 2019, at 2:54 AM, Ken Krugler wrote: > > Hi Qi, > >> On Mar 13, 2019, at 1

Re: Set partition number of Flink DataSet

2019-03-13 Thread Ken Krugler
Hi Qi, > On Mar 13, 2019, at 1:26 AM, qi luo wrote: > > Hi Ken, > > Do you mean that I can create a batch sink which writes to N files? Correct. > That sounds viable, but since our data size is huge (billions of records & > thousands of files), the performance may be unacceptable. The main

Re: Set partition number of Flink DataSet

2019-03-13 Thread qi luo
Hi Ken, Do you mean that I can create a batch sink which writes to N files? That sounds viable, but since our data size is huge (billions of records & thousands of files), the performance may be unacceptable. I will check Blink and give it a try anyway. Thank you, Qi > On Mar 12, 2019, at 11:

Re: Set partition number of Flink DataSet

2019-03-12 Thread Ken Krugler
Hi Qi, If I understand what you’re trying to do, then this sounds like a variation of a bucketing sink. That typically uses a field value to create a directory path or a file name (though the filename case is only viable when the field is also what’s used to partition the data) But I don’t be

Re: Set partition number of Flink DataSet

2019-03-12 Thread qi luo
Hi Ken, Thanks for your reply. I may not make myself clear: our problem is not about reading but rather writing. We need to write to N files based on key partitioning. We have to use setParallelism() to set the output partition/file number, but when the partition number is too large (~100K),

Re: Set partition number of Flink DataSet

2019-03-11 Thread Ken Krugler
Hi Qi, I’m guessing you’re calling createInput() for each input file. If so, then instead you want to do something like: Job job = Job.getInstance(); for each file… FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file path)); env.createI

Set partition number of Flink DataSet

2019-03-11 Thread qi luo
Hi, We’re trying to distribute batch input data to (N) HDFS files partitioning by hash using DataSet API. What I’m doing is like: env.createInput(…) .partitionByHash(0) .setParallelism(N) .output(…) This works well for small number of files. But when we need to distribute to