Stephan, this is exactly what I was looking for :) Thanks, will try it out.

I know the combineGroup() needed a reduceGroup() too, but I was just trying
out the APIs.

I did not realize that the other streaming APIs were already available. I
will have a look.

Thanks again.



On Mon, Aug 17, 2015 at 6:30 AM, Ashwin Jayaprakash <
ashwin.jayaprak...@gmail.com> wrote:

> Hi, I'm trying to evaluate Flink to see if it can do efficient semi-joins
> or self-joins with filter.
>
> Problem description:
> I have 1 stream that can contain "near duplicates" records. The records
> share a "family name" and so, many records can have the same family name.
> But each record has a unique id too.
>
> Query:
> 1) I would like to run a query such that I get the ids and names of some
> records based on a criterion
>
> 2) I then want to fetch all the records that match this set of ids fetched
> previously but this time include the duplicates that share the same name
>
> Question 1:
> I've pasted the sample code below, but I'm trying to see if I can push all
> the ids that came out of the first filter step "into" the connector,
> partition by partition and then pull out only the matching records in step
> 2. Is there a way to do this?
>
> Question 2:
> I haven't seen many examples of non-Hadoop connectors. Is there any plan
> for those?
>
> (If you are curious, I was looking at Presto and had to post the question
> there also -
> https://groups.google.com/forum/#!topic/presto-users/Ns0q4pvHwfo)
>
> public class JoinExample {
>     static final int AGE_ADD_CONSTANT = 100;
>
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>
>         DataSet<Record> allRecords = env.fromElements(
>                 new Record("aa", 1), new Record("bb", 2), new Record("ee", 
> 3), new Record("ww", 23),
>                 new Record("cc", 3),
>                 new Record("dd", 1), new Record("bb", 12), new Record("aa", 
> 6), new Record("aa", 33),
>                 new Record("ff", 87));
>         allRecords.print();
>         System.out.println();
>
>         DataSet<Record> newerRecords = allRecords
>                 .filter(new FilterFunction<Record>() {
>                     @Override
>                     public boolean filter(Record user) throws Exception {
>                         return user.age < 10;
>                     }
>                 })
>                 .groupBy(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .combineGroup(new GroupCombineFunction<Record, Record>() {
>                     @Override
>                     public void combine(Iterable<Record> iterable, 
> Collector<Record> collector) throws Exception {
>                         int age = 0;
>                         String id = null;
>                         for (Record user : iterable) {
>                             if (id == null) {
>                                 id = user.id;
>                             }
>                             //Just some way to fake a merge.
>                             age = (age * AGE_ADD_CONSTANT) + user.age;
>                         }
>                         collector.collect(new Record(id, age));
>                     }
>                 });
>         newerRecords.print();
>         System.out.println();
>
>         DataSet<Record> multiAgeRecords = allRecords
>                 /*
>                 If this were another data set, would it have been possible to 
> push all newerRecord ids
>                 partition by partition as multiple filters, without having to 
> do an explicit join?
>                  */
>                 .join(newerRecords)
>                 .where(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 }).equalTo(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .with(new FlatJoinFunction<Record, Record, Record>() {
>                     @Override
>                     public void join(Record user, Record user2, 
> Collector<Record> collector) throws Exception {
>                         collector.collect(new Record(user.id, user.age));
>                     }
>                 })
>                 .groupBy(new KeySelector<Record, String>() {
>                     @Override
>                     public String getKey(Record user) throws Exception {
>                         return user.id;
>                     }
>                 })
>                 .combineGroup(new GroupCombineFunction<Record, Record>() {
>                     @Override
>                     public void combine(Iterable<Record> iterable, 
> Collector<Record> collector) throws Exception {
>                         int age = 0;
>                         String id = null;
>                         for (Record user : iterable) {
>                             if (id == null) {
>                                 id = user.id;
>                             }
>                             //Just some way to fake a merge.
>                             age = (age * 100) + user.age;
>                         }
>                         collector.collect(new Record(id, age));
>                     }
>                 });
>
>         multiAgeRecords.print();
>     }
>
>     static class Record {
>         String id;
>         int age;
>
>         Record(String id, int age) {
>             this.id = id;
>             this.age = age;
>         }
>
>         @Override
>         public String toString() {
>             return "{" + "id='" + id + '\'' + ", age=" + age + '}';
>         }
>     }
> }
>
>
>
> Thanks.
>

Reply via email to