Hi all,
In case it is useful to people:
I was testing new DataStream convergent batch/streaming API and join did
not work in batch mode : https://issues.apache.org/jira/browse/FLINK-22587
I had to manually code an inner join using *KeyedCoProcessFunction* and
*states*. Here is an example of a manual join (implementing part of
TPC-DS query3 with avro GenericRecords) it may not be the best code, but
it could serve as an example for people interested
Best,
Etienne
// Join1: WHERE date_dim.d_date_sk = store_sales.ss_sold_date_sk Schema schemaJoinDateSk
=AvroUtils .getSchemaMerged(dateDimAvroSchema, storeSalesAvroSchema,
"recordsJoinDateSk"); final DataStream<GenericRecord> recordsJoinDateSk =
dateDim
.keyBy((KeySelector<GenericRecord, Integer>) value -> (Integer)
value.get("d_date_sk"))
.connect(storeSales.keyBy(
(KeySelector<GenericRecord, Integer>) value -> (Integer)
value.get("ss_sold_date_sk")))
.process(new JoinRecords(dateDimAvroSchema, storeSalesAvroSchema,
schemaJoinDateSk))
.returns(new GenericRecordAvroTypeInfo(schemaJoinDateSk));
Where *JoinRecords* does a manual inner join with states :
public JoinRecords(Schema schemaLhs, Schema schemaRhs, Schema outputSchema) {
this.schemaLhs = schemaLhs; this.schemaRhs = schemaRhs; this.outputSchema =
outputSchema; this.schemaLhsString = schemaLhs.toString(); this.schemaRhsString
= schemaRhs.toString(); this.schemaString = outputSchema.toString(); }
@Override public void open(Configuration parameters)throws Exception {
state1 = getRuntimeContext().getMapState(
new MapStateDescriptor<>("records_dataStream_1", Integer.class,
GenericRecord.class)); state2 = getRuntimeContext().getMapState(
new MapStateDescriptor<>("records_dataStream_2", Integer.class,
GenericRecord.class)); }
private GenericRecordjoinRecords(GenericRecord first, GenericRecord second)
throws Exception {
// after deserialization if (schemaLhs ==null) {
schemaLhs =new Schema.Parser().parse(schemaLhsString); }
if (schemaRhs ==null) {
schemaRhs =new Schema.Parser().parse(schemaRhsString); }
if (outputSchema ==null) {
outputSchema =new Schema.Parser().parse(schemaString); }
GenericRecord outputRecord =new GenericRecordBuilder(outputSchema).build();
for (Schema.Field f :outputSchema.getFields()) {
if (schemaLhs.getField(f.name()) !=null) {
outputRecord.put(f.name(), first.get(f.name())); }else if
(schemaRhs.getField(f.name()) !=null) {
outputRecord.put(f.name(), second.get(f.name())); }
}
return outputRecord; }
private GenericRecordstateJoin(GenericRecord currentRecord, int
currentDatastream, Context context)throws Exception {
final Integer currentKey = context.getCurrentKey(); MapState<Integer, GenericRecord> myState = currentDatastream ==1 ?state1 :state2; MapState<Integer, GenericRecord> otherState = currentDatastream ==1 ?state2 :state1; // join with the other datastream by looking into the state of
the other datastream final GenericRecord otherRecord = otherState.get(currentKey); if (otherRecord ==null) {// did not find a record to join with, store record for later join myState.put(currentKey, currentRecord); return null; }else {// found a record to join with (same key), join (with using the correct
avro schema) return currentDatastream ==1 ? joinRecords(currentRecord, otherRecord) : joinRecords(otherRecord, currentRecord); }
}
@Override public void processElement1(GenericRecord currentRecord, Context context,
Collector<GenericRecord> collector)throws Exception {
final GenericRecord jointRecord = stateJoin(currentRecord, 1, context); if
(jointRecord !=null) {
collector.collect(jointRecord); }
}
@Override public void processElement2(GenericRecord currentRecord, Context context,
Collector<GenericRecord> collector)throws Exception {
final GenericRecord jointRecord = stateJoin(currentRecord, 2, context); if
(jointRecord !=null) {
collector.collect(jointRecord); }
}
}