Hi all,

In case it is useful to people:

I was testing new DataStream convergent batch/streaming API and join did not work in batch mode : https://issues.apache.org/jira/browse/FLINK-22587

I had to manually code an inner join using *KeyedCoProcessFunction* and *states*. Here is an example of a manual join (implementing part of TPC-DS query3 with avro GenericRecords) it may not be the best code, but it could serve as an example for people interested

Best,

Etienne


// Join1: WHERE date_dim.d_date_sk = store_sales.ss_sold_date_sk Schema schemaJoinDateSk 
=AvroUtils .getSchemaMerged(dateDimAvroSchema, storeSalesAvroSchema, 
"recordsJoinDateSk"); final DataStream<GenericRecord> recordsJoinDateSk = 
dateDim
  .keyBy((KeySelector<GenericRecord, Integer>) value -> (Integer) 
value.get("d_date_sk"))
  .connect(storeSales.keyBy(
    (KeySelector<GenericRecord, Integer>) value -> (Integer) 
value.get("ss_sold_date_sk")))
  .process(new JoinRecords(dateDimAvroSchema, storeSalesAvroSchema, 
schemaJoinDateSk))
  .returns(new GenericRecordAvroTypeInfo(schemaJoinDateSk));


Where *JoinRecords* does a manual inner join with states :


  public JoinRecords(Schema schemaLhs, Schema schemaRhs, Schema outputSchema) {
    this.schemaLhs = schemaLhs; this.schemaRhs = schemaRhs; this.outputSchema = 
outputSchema; this.schemaLhsString = schemaLhs.toString(); this.schemaRhsString 
= schemaRhs.toString(); this.schemaString = outputSchema.toString(); }

  @Override public void open(Configuration parameters)throws Exception {
    state1 = getRuntimeContext().getMapState(
      new MapStateDescriptor<>("records_dataStream_1", Integer.class, 
GenericRecord.class)); state2 = getRuntimeContext().getMapState(
      new MapStateDescriptor<>("records_dataStream_2", Integer.class, 
GenericRecord.class)); }

  private GenericRecordjoinRecords(GenericRecord first, GenericRecord second)
    throws Exception {
    // after deserialization if (schemaLhs ==null) {
      schemaLhs =new Schema.Parser().parse(schemaLhsString); }
    if (schemaRhs ==null) {
      schemaRhs =new Schema.Parser().parse(schemaRhsString); }
    if (outputSchema ==null) {
      outputSchema =new Schema.Parser().parse(schemaString); }

    GenericRecord outputRecord =new GenericRecordBuilder(outputSchema).build(); 
for (Schema.Field f :outputSchema.getFields()) {
      if (schemaLhs.getField(f.name()) !=null) {
        outputRecord.put(f.name(), first.get(f.name())); }else if 
(schemaRhs.getField(f.name()) !=null) {
        outputRecord.put(f.name(), second.get(f.name())); }
    }
    return outputRecord; }

  private GenericRecordstateJoin(GenericRecord currentRecord, int 
currentDatastream, Context context)throws Exception {
final Integer currentKey = context.getCurrentKey(); MapState<Integer, GenericRecord> myState = currentDatastream ==1 ?state1 :state2; MapState<Integer, GenericRecord> otherState = currentDatastream ==1 ?state2 :state1; // join with the other datastream by looking into the state of the other datastream final GenericRecord otherRecord = otherState.get(currentKey); if (otherRecord ==null) {// did not find a record to join with, store record for later join myState.put(currentKey, currentRecord); return null; }else {// found a record to join with (same key), join (with using the correct avro schema) return currentDatastream ==1 ? joinRecords(currentRecord, otherRecord) : joinRecords(otherRecord, currentRecord); }
  }

  @Override public void processElement1(GenericRecord currentRecord, Context context, 
Collector<GenericRecord> collector)throws Exception {
    final GenericRecord jointRecord = stateJoin(currentRecord, 1, context); if 
(jointRecord !=null) {
      collector.collect(jointRecord); }
  }

  @Override public void processElement2(GenericRecord currentRecord, Context context, 
Collector<GenericRecord> collector)throws Exception {
    final GenericRecord jointRecord = stateJoin(currentRecord, 2, context); if 
(jointRecord !=null) {
      collector.collect(jointRecord); }
  }
}

Reply via email to