Stephan, this is exactly what I was looking for :) Thanks, will try it out.
I know the combineGroup() needed a reduceGroup() too, but I was just trying out the APIs. I did not realize that the other streaming APIs were already available. I will have a look. Thanks again. On Mon, Aug 17, 2015 at 6:30 AM, Ashwin Jayaprakash < ashwin.jayaprak...@gmail.com> wrote: > Hi, I'm trying to evaluate Flink to see if it can do efficient semi-joins > or self-joins with filter. > > Problem description: > I have 1 stream that can contain "near duplicates" records. The records > share a "family name" and so, many records can have the same family name. > But each record has a unique id too. > > Query: > 1) I would like to run a query such that I get the ids and names of some > records based on a criterion > > 2) I then want to fetch all the records that match this set of ids fetched > previously but this time include the duplicates that share the same name > > Question 1: > I've pasted the sample code below, but I'm trying to see if I can push all > the ids that came out of the first filter step "into" the connector, > partition by partition and then pull out only the matching records in step > 2. Is there a way to do this? > > Question 2: > I haven't seen many examples of non-Hadoop connectors. Is there any plan > for those? > > (If you are curious, I was looking at Presto and had to post the question > there also - > https://groups.google.com/forum/#!topic/presto-users/Ns0q4pvHwfo) > > public class JoinExample { > static final int AGE_ADD_CONSTANT = 100; > > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > DataSet<Record> allRecords = env.fromElements( > new Record("aa", 1), new Record("bb", 2), new Record("ee", > 3), new Record("ww", 23), > new Record("cc", 3), > new Record("dd", 1), new Record("bb", 12), new Record("aa", > 6), new Record("aa", 33), > new Record("ff", 87)); > allRecords.print(); > System.out.println(); > > DataSet<Record> newerRecords = allRecords > .filter(new FilterFunction<Record>() { > @Override > public boolean filter(Record user) throws Exception { > return user.age < 10; > } > }) > .groupBy(new KeySelector<Record, String>() { > @Override > public String getKey(Record user) throws Exception { > return user.id; > } > }) > .combineGroup(new GroupCombineFunction<Record, Record>() { > @Override > public void combine(Iterable<Record> iterable, > Collector<Record> collector) throws Exception { > int age = 0; > String id = null; > for (Record user : iterable) { > if (id == null) { > id = user.id; > } > //Just some way to fake a merge. > age = (age * AGE_ADD_CONSTANT) + user.age; > } > collector.collect(new Record(id, age)); > } > }); > newerRecords.print(); > System.out.println(); > > DataSet<Record> multiAgeRecords = allRecords > /* > If this were another data set, would it have been possible to > push all newerRecord ids > partition by partition as multiple filters, without having to > do an explicit join? > */ > .join(newerRecords) > .where(new KeySelector<Record, String>() { > @Override > public String getKey(Record user) throws Exception { > return user.id; > } > }).equalTo(new KeySelector<Record, String>() { > @Override > public String getKey(Record user) throws Exception { > return user.id; > } > }) > .with(new FlatJoinFunction<Record, Record, Record>() { > @Override > public void join(Record user, Record user2, > Collector<Record> collector) throws Exception { > collector.collect(new Record(user.id, user.age)); > } > }) > .groupBy(new KeySelector<Record, String>() { > @Override > public String getKey(Record user) throws Exception { > return user.id; > } > }) > .combineGroup(new GroupCombineFunction<Record, Record>() { > @Override > public void combine(Iterable<Record> iterable, > Collector<Record> collector) throws Exception { > int age = 0; > String id = null; > for (Record user : iterable) { > if (id == null) { > id = user.id; > } > //Just some way to fake a merge. > age = (age * 100) + user.age; > } > collector.collect(new Record(id, age)); > } > }); > > multiAgeRecords.print(); > } > > static class Record { > String id; > int age; > > Record(String id, int age) { > this.id = id; > this.age = age; > } > > @Override > public String toString() { > return "{" + "id='" + id + '\'' + ", age=" + age + '}'; > } > } > } > > > > Thanks. >