Hi, I'm trying to evaluate Flink to see if it can do efficient semi-joins or self-joins with filter.
Problem description: I have 1 stream that can contain "near duplicates" records. The records share a "family name" and so, many records can have the same family name. But each record has a unique id too. Query: 1) I would like to run a query such that I get the ids and names of some records based on a criterion 2) I then want to fetch all the records that match this set of ids fetched previously but this time include the duplicates that share the same name Question 1: I've pasted the sample code below, but I'm trying to see if I can push all the ids that came out of the first filter step "into" the connector, partition by partition and then pull out only the matching records in step 2. Is there a way to do this? Question 2: I haven't seen many examples of non-Hadoop connectors. Is there any plan for those? (If you are curious, I was looking at Presto and had to post the question there also - https://groups.google.com/forum/#!topic/presto-users/Ns0q4pvHwfo) public class JoinExample { static final int AGE_ADD_CONSTANT = 100; public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Record> allRecords = env.fromElements( new Record("aa", 1), new Record("bb", 2), new Record("ee", 3), new Record("ww", 23), new Record("cc", 3), new Record("dd", 1), new Record("bb", 12), new Record("aa", 6), new Record("aa", 33), new Record("ff", 87)); allRecords.print(); System.out.println(); DataSet<Record> newerRecords = allRecords .filter(new FilterFunction<Record>() { @Override public boolean filter(Record user) throws Exception { return user.age < 10; } }) .groupBy(new KeySelector<Record, String>() { @Override public String getKey(Record user) throws Exception { return user.id; } }) .combineGroup(new GroupCombineFunction<Record, Record>() { @Override public void combine(Iterable<Record> iterable, Collector<Record> collector) throws Exception { int age = 0; String id = null; for (Record user : iterable) { if (id == null) { id = user.id; } //Just some way to fake a merge. age = (age * AGE_ADD_CONSTANT) + user.age; } collector.collect(new Record(id, age)); } }); newerRecords.print(); System.out.println(); DataSet<Record> multiAgeRecords = allRecords /* If this were another data set, would it have been possible to push all newerRecord ids partition by partition as multiple filters, without having to do an explicit join? */ .join(newerRecords) .where(new KeySelector<Record, String>() { @Override public String getKey(Record user) throws Exception { return user.id; } }).equalTo(new KeySelector<Record, String>() { @Override public String getKey(Record user) throws Exception { return user.id; } }) .with(new FlatJoinFunction<Record, Record, Record>() { @Override public void join(Record user, Record user2, Collector<Record> collector) throws Exception { collector.collect(new Record(user.id, user.age)); } }) .groupBy(new KeySelector<Record, String>() { @Override public String getKey(Record user) throws Exception { return user.id; } }) .combineGroup(new GroupCombineFunction<Record, Record>() { @Override public void combine(Iterable<Record> iterable, Collector<Record> collector) throws Exception { int age = 0; String id = null; for (Record user : iterable) { if (id == null) { id = user.id; } //Just some way to fake a merge. age = (age * 100) + user.age; } collector.collect(new Record(id, age)); } }); multiAgeRecords.print(); } static class Record { String id; int age; Record(String id, int age) { this.id = id; this.age = age; } @Override public String toString() { return "{" + "id='" + id + '\'' + ", age=" + age + '}'; } } } Thanks.