Re: Small-files source - partitioning based on prefix of file

2018-08-28 Thread Averell
Thank you Fabian. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

2018-08-28 Thread Fabian Hueske
Hi, CMCF is not a source, only the file monitoring function is. Barriers are injected by the FMF when the JM sends a checkpoint message. The barriers then travel to the CMCF and trigger the Checkpoint ING. Fabian Averell schrieb am Di., 28. Aug. 2018, 12:02: > Hello Fabian, > > Thanks for t

Re: Small-files source - partitioning based on prefix of file

2018-08-28 Thread Averell
Hello Fabian, Thanks for the answer. However, my question is a little bit different. Let me rephrase my example and my question: * I have 10,000 unsplittable small files to read, which, in total, has about 10M output lines. * From Flink's reporting web GUI, I can see that CFMF and Contin

Re: Small-files source - partitioning based on prefix of file

2018-08-28 Thread Fabian Hueske
Hi Averell, Barriers are injected into the regular data flow by source functions. In case of a file monitoring source, the barriers are injected into the stream of file splits that are passed to the ContinuousFileMonitoringFunction. The CFMF puts the splits into a queue and processes them with a d

Re: Small-files source - partitioning based on prefix of file

2018-08-27 Thread Averell
Hello Fabian, and all, Please excuse me for digging this old thread up. I have a question regarding sending of the "barrier" messages in Flink's checkpointing mechanism: I want to know when those barrier messages are sent when I am using a file source. Where can I find it in the source code? I'm

Re: Small-files source - partitioning based on prefix of file

2018-08-12 Thread Averell
Thank you Fabian. It is clear to me now. Thanks a lot for your help. Regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Fabian Hueske
Hi Averell, Conceptually, you are right. Checkpoints are taken at every operator at the same "logical" time. It is not important, that each operator checkpoints at the same wallclock time. Instead, the need to take a checkpoint when they have processed the same input. This is implemented with so-c

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Averell
Thank you Vino, Jorn, and Fabian. Please forgive me for my ignorant, as I am still not able to fully understand state/checkpointing and the statement that Fabian gave earlier: "/In either case, some record will be read twice but if reading position can be reset, you can still have exactly-once stat

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Fabian Hueske
Hi Averell, One comment regarding what you said: > As my files are small, I think there would not be much benefit in checkpointing file offset state. Checkpointing is not about efficiency but about consistency. If the position in a split is not checkpointed, your application won't operate with e

Re: Small-files source - partitioning based on prefix of file

2018-08-10 Thread Jörn Franke
Or you write a custom file system for Flink... (for the tar part). Unfortunately gz files can only be processed single threaded (there are some multiple thread implementation but they don’t bring the big gain). > On 10. Aug 2018, at 07:07, vino yang wrote: > > Hi Averell, > > In this case, I

Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread vino yang
Hi Averell, In this case, I think you may need to extend Flink's existing source. First, read your tar.gz large file, when it been decompressed, use the multi-threaded ability to read the record in the source, and then parse the data format (map / flatmap might be a suitable operator, you can cha

Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread Averell
Hi Fabian, Vino, I have one more question, which I initially planned to create a new thread, but now I think it is better to ask here: I need to process one big tar.gz file which contains multiple small gz files. What is the best way to do this? I am thinking of having one single thread process th

Re: Small-files source - partitioning based on prefix of file

2018-08-09 Thread Averell
Thank you Vino and Fabien for your help in answering my questions. As my files are small, I think there would not be much benefit in checkpointing file offset state. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

2018-08-08 Thread Fabian Hueske
Hi Averall, As Vino said, checkpoints store the state of all operators of an application. The state of a monitoring source function is the position in the currently read split and all splits that have been received and are currently pending. In case of a recovery, the splits are recovered and the

Re: Small-files source - partitioning based on prefix of file

2018-08-08 Thread vino yang
Hi Averell, You need to understand that Flink reflects the recovery of the state, not the recovery of the record. Of course, sometimes your record is state, but sometimes the intermediate result of your record is the state. It depends on your business logic and your operators. Thanks, vino. Aver

Re: Small-files source - partitioning based on prefix of file

2018-08-07 Thread Averell
Thank you Fabian. "/In either case, some record will be read twice but if reading position can be reset, you can still have exactly-once state consistency because the state is reset as well./" I do not quite understand this statement. If I have read 30 lines from the checkpoint and sent those 30 r

Re: Small-files source - partitioning based on prefix of file

2018-07-31 Thread Fabian Hueske
Hi Averell, please find my answers inlined. Best, Fabian 2018-07-31 13:52 GMT+02:00 Averell : > Hi Fabian, > > Thanks for the information. I will try to look at the change to that > complex > logic that you mentioned when I have time. That would save one more shuffle > (from 1 to 0), wouldn't t

Re: Small-files source - partitioning based on prefix of file

2018-07-31 Thread Averell
Hi Fabian, Thanks for the information. I will try to look at the change to that complex logic that you mentioned when I have time. That would save one more shuffle (from 1 to 0), wouldn't that? BTW, regarding fault tolerant in the file reader task, could you help explain what would happen if the

Re: Small-files source - partitioning based on prefix of file

2018-07-31 Thread Fabian Hueske
Hi Averell, The records emitted by the monitoring tasks are "just" file splits, i.e., meta information that defines which data to read from where. The reader tasks receive these splits and process them by reading the corresponding files. You could of course partition the splits based on the file

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, Actually, Performing a key partition inside the Source Function is the same as DataStream[Source].keyBy(cumstom partitioner), because keyBy is not a real operator, but a virtual node in a DAG, which does not correspond to a physical operator. Thanks, vino. 2018-07-31 10:52 GMT+08:00

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Hi Vino, I'm a little bit confused. If I can do the partitioning from within the source function, using the same hash function on the key to identify the partition, would that be sufficient to avoid shuffling in the next byKey call? Thanks. Averell -- Sent from: http://apache-flink-user-maili

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, The keyBy transformation will trigger the key partition, which is one of the various partition types supported by Flink, which causes the data to be shuffled. It routes the keys of the same hash value to the same node based on the hash of the key you passed (or generated by the custom

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Oh, Thank you Vino. I was not aware of that reshuffling after every custom partitioning. Why would that needed though? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, As far as I know, the custom partitioner will inevitably lead to shuffle of data. Even if it is bundled in the logic of the source function, isn't the behavior different? Thanks, vino. 2018-07-30 20:32 GMT+08:00 Averell : > Thanks Vino. > > Yes, I can do that after the source functi

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thanks Vino. Yes, I can do that after the source function. But that means data would be shuffled - sending from every source to the right partition. I think that by doing the partition from within the file source would help to save that shuffling. Thanks. Averell. -- Sent from: http://apache-f

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, Yes, you can not do it in the source function. I think you can call keyBy with a partitioner (based on NodeID) after source. Why do you have to use the customized partitioner in the source function? Thanks, vino. 2018-07-30 19:56 GMT+08:00 Averell : > Thank you Vino. > > Yes, I went

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread Averell
Thank you Vino. Yes, I went thru that official guide before posting this question. The problem was that I could not see any call to one of those mentioned partitioning methods (partitionCustom, shuffle, rebalance, rescale, or broadcast) in the original readFile function. I'm still trying to look i

Re: Small-files source - partitioning based on prefix of file

2018-07-30 Thread vino yang
Hi Averell, Did you know Flink allow you to customize a partitioner? Some resource : official documentation : https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/#physical-partitioning discussing in mailing list : http://apache-flink-user-mailing-list-archive.2336050