Hi Folks, I am reviving this thread again, as I am stuck in one step to achieve my target.
the following code is doing partitioning, before coGrouping, then writing to disk. I am trying to re-read the data from disk, so I have create*LocatableInputSPlits [] *with the size of DOP. Find the code Below inPerson.partitionByHash("name") .map(new TrackHost()) .coGroup(inStudent.partitionByHash("name")) .where("name").equalTo("name") .with(new ComputeStudiesProfile()) .write(new TextOutputFormat(new Path()), "file:///home/mustafa/Documents/tst/", FileSystem.WriteMode.OVERWRITE); LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()]; splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost"); splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost"); splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost"); DataSet<Person> secondIn = env.createInput(new MutableInputFormatTest(new Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new PersonMapper()); secondIn.print(); TrackHost is an Accumulator to track the host information, && MutuableInputFormat, is an customInputFormat which extends TextInputFormat && implements StrictlyLocalAssignment .. I am using LocatableInputSplit as a instanceField, as implementing InputSplit is conflicting with TextInputFormat, on the createInputSplit method, they both have the same method and the compiler complained for that. Again, while debugging I could see the problem in *ExectionJobVertex line 146 . *the execution ignores the Locatables I am shipping with my splits, and re-create inputSplits again which get the hostInfo(Machine Name) from the execution somehow, while the taskManagers prepared by the scheduler waiting for a machine with "LocalHost". Any Suggestion ?? Regards. On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Alright, so if both inputs of the CoGroup are read from the file system, > there should be a way to do the co-group on co-located data without > repartitioning. > In fact, I have some code lying around to do co-located joins from local > FS [1]. Haven't tested it thoroughly and it also relies on a number of > assumptions. If the data is also sorted you can even get around sorting it > if you inject a few lines into the optimizer (see change for FLINK-1444) > and ensure that each source reads exactly one! input split. > > Regarding your question about the PACT output contracts, there were three > types which were defined wrt to a Key/Value pair data model: > - Same key: UDF does not modify the key > - Super key: UDF extends the key (Partitioning remains valid, sorting not) > - Unique key: Keys from UDF or source are unique > > Let me know, if you have questions. > Cheers, Fabian > > [1] https://github.com/fhueske/flink-localjoin-utils > > 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov < > alexander.s.alexand...@gmail.com>: > >> Thanks for the feedback, Fabian. >> >> This is related to the question I sent on the user mailing list >> yesterday. Mustafa is working on a master thesis where we try to abstract >> an operator for the update of stateful datasets (decoupled from the current >> native iterations logic) and use it in conjunction with lazy unrolling of >> iterations. >> >> The assumptions are as follows: >> >> - Each iteration runs a job with the same structure and the same DOP; >> - Updates a realized through a coGroup with a fixed DOP (let's say *N*), >> which consumes a *(state, updates)* pair of datasets and produces a >> new version of the state (let's call it *state'*); >> - We keep track where the *N* output partitions of *state'* are >> located and use this information for local placement of the corresponding >> *N* DataSource tasks in the next iteration (via FLINK-1478); >> - The remaining piece of the puzzle is to figure out how to tell the >> coGroup that one of the inputs is already partitioned so id avoids an >> unnecessary shuffle; >> >> If I remember correctly back in the day we had a PACT output contract >> that served a similar purpose avoid unnecessary shuffles), but I was not >> able to find it yesterday. >> >> In either case, I think even if that does not work out of the box at the >> moment, that most of the logic is in place (e.g. co-location groups in the >> scheduler), and we are willing to either hack the code or add the missing >> functionality in order to realize the above described goal. >> >> Suggestions are welcome! >> >> Regards, >> Alex >> >> >> >> >> 2015-05-18 17:42 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>> Hi Mustafa, >>> >>> I'm afraid, this is not possible. >>> Although you can annotate DataSources with partitioning information, >>> this is not enough to avoid repartitioning for a CoGroup. The reason for >>> that is that CoGroup requires co-partitioning of both inputs, i.e., both >>> inputs must be equally partitioned (same number of partitions, same >>> partitioning function, same location of partitions). Since Flink is >>> dynamically assigning tasks to execution slots, it is not possible to >>> co-locate data that was read from a data source and data coming from the >>> result of another computation. >>> >>> If you just need the result of the first co-group on disk, you could >>> also build a single program that does both co-groups and additional writes >>> the result of the first co-group to disk (Flink supports multiple data >>> sinks). >>> >>> Best, Fabian >>> >>> 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <elbeherymust...@gmail.com>: >>> >>>> Hi, >>>> >>>> I am writing a flink job, in which I have three datasets. I have >>>> partitionedByHash the first two before coGrouping them. >>>> >>>> My plan is to spill the result of coGrouping to disk, and then re-read >>>> it again before coGrouping with the third dataset. >>>> >>>> My question is, is there anyway to inform flink that the first coGroup >>>> result is already partitioned ?! I know I can re-partition again before >>>> coGrouping but I would like to know if there is anyway to avoid a step >>>> which was already executed, >>>> >>>> Regards. >>>> >>>> -- >>>> Mustafa Elbehery >>>> EIT ICT Labs Master School >>>> <http://www.masterschool.eitictlabs.eu/home/> >>>> +49(0)15750363097 >>>> skype: mustafaelbehery87 >>>> >>>> >>> >> > -- Mustafa Elbehery EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> +49(0)15750363097 skype: mustafaelbehery87