I am not sure if I got your question right. You can easily prevent the NoSuchElementException, but calling next() only if hasNext() returns true.
2015-06-04 11:18 GMT+02:00 Mustafa Elbehery <elbeherymust...@gmail.com>: > Yes, Its working now .. But my assumption is that I want to join different > datasets on the common key, so it will be normal to have many tuples on > side, which does not exist on the other side .. > > How to fix that ?!! > > On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> one of the iterables of a CoGroup function can be empty. Calling >> iterator.next() on an empty iterator raises the NoSuchElementException. >> This is the expected behavior of the function. >> >> Are you sure your assumption about your data are correct, i.e., that the >> iterator should always have (at least) one element? >> >> Fabian >> >> >> 2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <elbeherymust...@gmail.com>: >> >>> Hi, >>> >>> >>> public static class ComputeStudiesProfile implements >>> CoGroupFunction<Person, StudentInfo, Person> { >>> >>> Person person; >>> >>> @Override >>> public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> >>> iterable1, Collector<Person> collector) throws Exception { >>> >>> Iterator<Person> iterator = iterable.iterator(); >>> person = iterator.next(); >>> >>> ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>(); >>> Iterator<StudentInfo> infosIterator = iterable1.iterator(); >>> >>> while(infosIterator.hasNext()) >>> infos.add(infosIterator.next()); >>> >>> if (infos.size() > 0) { >>> update(person, infos, collector); >>> } >>> } >>> >>> public void update(Person person, Collection<StudentInfo> infos, >>> Collector<Person> collector) { >>> person.setMajor(infos.iterator().next().getMajor()); >>> for(StudentInfo info : infos){ >>> person.getBestCourse().addAll(info.getCourses()); >>> } >>> collector.collect(person); >>> } >>> } >>> >>> >>> ******************************************************************************************************* >>> >>> >>> public static class ComputeJobsProfile implements CoGroupFunction<Person, >>> StudentJobs, Person> { >>> >>> @Override >>> public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> >>> iterable1, Collector<Person> collector) throws Exception { >>> >>> Person person = iterable.iterator().next(); >>> >>> ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>(); >>> for (StudentJobs job : iterable1) { >>> jobs.add(job); >>> } >>> if (jobs.size() > 0) { >>> update(person, jobs, collector); >>> } >>> } >>> >>> public void update(Person person, Collection<StudentJobs> jobs, >>> Collector<Person> collector) { >>> >>> for(StudentJobs job : jobs){ >>> person.getJobs().addAll(job.getJobs()); >>> } >>> collector.collect(person); >>> } >>> } >>> >>> >>> On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Hi! >>>> >>>> The code snippet is not very revealing. Can you also shot the >>>> implementations of the CoGroupFunctions? >>>> >>>> Thanks! >>>> >>>> On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery < >>>> elbeherymust...@gmail.com> wrote: >>>> >>>>> Code Snippet :) >>>>> >>>>> DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent) >>>>> .where("name").equalTo("name") >>>>> .with(new ComputeStudiesProfile()); >>>>> >>>>> DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs) >>>>> .where("name").equalTo("name") >>>>> .with(new ComputeJobsProfile()); >>>>> >>>>> updatedPersonTwo.print(); >>>>> >>>>> >>>>> On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery < >>>>> elbeherymust...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am trying to write two coGrouprs in sequence on the same ETL .. In >>>>>> use common dataset in both of them, in the first coGroup I update the >>>>>> initial dataset and retrieve the result in a new dataset object. Then I >>>>>> use >>>>>> the result in the second coGroup with another new dataset. >>>>>> >>>>>> While debugging, I could see the coGroup.next is *false *, however, >>>>>> in the next iteration it has elements. I tried to force enabling >>>>>> ObjectReuse, I got *half* of the expected result. I have attached a >>>>>> screenshot for the debugger. >>>>>> >>>>>> My question is, does this has a relation about the concurrent >>>>>> execution of different tasks in Flink. And how to solve this problem ?? >>>>>> >>>>>> Regards. >>>>>> >>>>>> >>>>>> -- >>>>>> Mustafa Elbehery >>>>>> EIT ICT Labs Master School >>>>>> <http://www.masterschool.eitictlabs.eu/home/> >>>>>> +49(0)15750363097 >>>>>> skype: mustafaelbehery87 >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Mustafa Elbehery >>>>> EIT ICT Labs Master School >>>>> <http://www.masterschool.eitictlabs.eu/home/> >>>>> +49(0)15750363097 >>>>> skype: mustafaelbehery87 >>>>> >>>>> >>>> >>> >>> >>> -- >>> Mustafa Elbehery >>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> >>> +49(0)15750363097 >>> skype: mustafaelbehery87 >>> >>> >> > > > -- > Mustafa Elbehery > EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/> > +49(0)15750363097 > skype: mustafaelbehery87 > >