Hi,
public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo, Person> { Person person; @Override public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> iterable1, Collector<Person> collector) throws Exception { Iterator<Person> iterator = iterable.iterator(); person = iterator.next(); ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>(); Iterator<StudentInfo> infosIterator = iterable1.iterator(); while(infosIterator.hasNext()) infos.add(infosIterator.next()); if (infos.size() > 0) { update(person, infos, collector); } } public void update(Person person, Collection<StudentInfo> infos, Collector<Person> collector) { person.setMajor(infos.iterator().next().getMajor()); for(StudentInfo info : infos){ person.getBestCourse().addAll(info.getCourses()); } collector.collect(person); } } ******************************************************************************************************* public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs, Person> { @Override public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> iterable1, Collector<Person> collector) throws Exception { Person person = iterable.iterator().next(); ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>(); for (StudentJobs job : iterable1) { jobs.add(job); } if (jobs.size() > 0) { update(person, jobs, collector); } } public void update(Person person, Collection<StudentJobs> jobs, Collector<Person> collector) { for(StudentJobs job : jobs){ person.getJobs().addAll(job.getJobs()); } collector.collect(person); } } On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > The code snippet is not very revealing. Can you also shot the > implementations of the CoGroupFunctions? > > Thanks! > > On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery < > elbeherymust...@gmail.com> wrote: > >> Code Snippet :) >> >> DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent) >> .where("name").equalTo("name") >> .with(new ComputeStudiesProfile()); >> >> DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs) >> .where("name").equalTo("name") >> .with(new ComputeJobsProfile()); >> >> updatedPersonTwo.print(); >> >> >> On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery < >> elbeherymust...@gmail.com> wrote: >> >>> Hi, >>> >>> I am trying to write two coGrouprs in sequence on the same ETL .. In use >>> common dataset in both of them, in the first coGroup I update the initial >>> dataset and retrieve the result in a new dataset object. Then I use the >>> result in the second coGroup with another new dataset. >>> >>> While debugging, I could see the coGroup.next is *false *, however, in >>> the next iteration it has elements. I tried to force enabling ObjectReuse, >>> I got *half* of the expected result. I have attached a screenshot for >>> the debugger. >>> >>> My question is, does this has a relation about the concurrent execution >>> of different tasks in Flink. And how to solve this problem ?? >>> >>> Regards. >>> >>> >>> -- >>> Mustafa Elbehery >>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> >>> +49(0)15750363097 >>> skype: mustafaelbehery87 >>> >>> >> >> >> -- >> Mustafa Elbehery >> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> >> +49(0)15750363097 >> skype: mustafaelbehery87 >> >> > -- Mustafa Elbehery EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> +49(0)15750363097 skype: mustafaelbehery87