Hi,

public static class ComputeStudiesProfile implements
CoGroupFunction<Person, StudentInfo, Person> {

   Person person;

   @Override
   public void coGroup(Iterable<Person> iterable,
Iterable<StudentInfo> iterable1, Collector<Person> collector) throws
Exception {

      Iterator<Person> iterator = iterable.iterator();
      person = iterator.next();

      ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
      Iterator<StudentInfo> infosIterator = iterable1.iterator();

      while(infosIterator.hasNext())
            infos.add(infosIterator.next());

      if (infos.size() > 0) {
         update(person, infos, collector);
      }
   }

   public void update(Person person, Collection<StudentInfo> infos,
Collector<Person> collector) {
      person.setMajor(infos.iterator().next().getMajor());
      for(StudentInfo info : infos){
         person.getBestCourse().addAll(info.getCourses());
      }
      collector.collect(person);
   }
}

 
*******************************************************************************************************


public static class ComputeJobsProfile implements
CoGroupFunction<Person, StudentJobs, Person> {

   @Override
   public void coGroup(Iterable<Person> iterable,
Iterable<StudentJobs> iterable1, Collector<Person> collector) throws
Exception {

       Person person = iterable.iterator().next();

      ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
      for (StudentJobs job : iterable1) {
         jobs.add(job);
      }
      if (jobs.size() > 0) {
         update(person, jobs, collector);
      }
   }

   public void update(Person person, Collection<StudentJobs> jobs,
Collector<Person> collector) {

      for(StudentJobs job : jobs){
         person.getJobs().addAll(job.getJobs());
      }
      collector.collect(person);
   }
}


On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> The code snippet is not very revealing. Can you also shot the
> implementations of the CoGroupFunctions?
>
> Thanks!
>
> On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <
> elbeherymust...@gmail.com> wrote:
>
>> Code Snippet :)
>>
>> DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
>>                            .where("name").equalTo("name")
>>                            .with(new ComputeStudiesProfile());
>>
>> DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
>>                            .where("name").equalTo("name")
>>                            .with(new ComputeJobsProfile());
>>
>> updatedPersonTwo.print();
>>
>>
>> On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <
>> elbeherymust...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to write two coGrouprs in sequence on the same ETL .. In use
>>> common dataset in both of them, in the first coGroup I update the initial
>>> dataset and retrieve the result in a new dataset object. Then I use the
>>> result in the second coGroup with another new dataset.
>>>
>>> While debugging, I could see the coGroup.next is *false *, however, in
>>> the next iteration it has elements. I tried to force enabling ObjectReuse,
>>> I got *half* of the expected result. I have attached a screenshot for
>>> the debugger.
>>>
>>> My question is, does this has a relation about the concurrent execution
>>> of different tasks in Flink. And how to solve this problem ??
>>>
>>> Regards.
>>>
>>>
>>> --
>>> Mustafa Elbehery
>>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>>> +49(0)15750363097
>>> skype: mustafaelbehery87
>>>
>>>
>>
>>
>> --
>> Mustafa Elbehery
>> EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
>> +49(0)15750363097
>> skype: mustafaelbehery87
>>
>>
>


-- 
Mustafa Elbehery
EIT ICT Labs Master School <http://www.masterschool.eitictlabs.eu/home/>
+49(0)15750363097
skype: mustafaelbehery87

Reply via email to