Dear all,

A small update on my end regarding the implementation of datasteam window join 
in pyflink. Looking around in the code I was thinking we could apply the same 
method as done for the key_by method were the JKeyByKeyBySelector class is used 
as a java proxy to key by the stream, the user's key_selector is used as in 
process function behind the scene to transform the stream as a tuple of (key, 
value) and then the KeyBySelector in java is used to extract the field at 
position 0 for the key.

Similarly, I defined a "generic" TupleJoinFunction which returns a 
Tuple2<first, second> and then applies the user's python JoinFunction on the 
output as a process function. Below a code example on how it's done:

```
    JKeyByKeySelector = get_gateway().jvm.KeyByKeySelector
    JTupleJoinFunction = (
        get_gateway().jvm.com.nw.flink.datastream.functions.TupleJoinFunction
    )

    joined_stream = DataStream(
        left​.map(
            lambda r: __import__("pyflink").common.Row(left_key_selector(r), r),
            output_type=Types.ROW([left_key_type, left.get_type()]),
        )._j_data_stream
        .join(
            right.map(
                lambda r: 
__import__("pyflink").common.Row(right_key_selector(r), r),
                output_type=Types.ROW([right_key_type, right.get_type()]),
            )._j_data_stream
        )
        .where(JKeyByKeySelector())
        .equalTo(JKeyByKeySelector())
        .window(j_window_assigner)
        .apply(
            JTupleJoinFunction(),
            Types.TUPLE([left.get_type(), 
right.get_type()]).get_java_type_info(),
        )
    ).map(lambda e: join_function(e[0], e[1]), output_type)
```

and the java implementation of the TupleJoinFunction:
```
public class TupleJoinFunction implements JoinFunction<Row, Row, Tuple2<Row, 
Row>> {
@Override
public Tuple2<Row, Row> join(Row first, Row second) throws Exception {
      return new Tuple2<Row, Row>((Row) first.getField(1), (Row) 
second.getField(1));
}
}
```

The only issue I think with this proposal is that the current python 
WindowAssigner implementation does not give access to the underlying java 
object. Does someone with more expertise on the pyflink implementation have any 
comments on this - this would be much appreciated ?
Again, provided a bit of guidance I am willing to put in the work to open a PR 
for the python API to support this feature

Best regards,
Hugo Polsinelli
________________________________
De : Hugo POLSINELLI <hugo.polsine...@nw-groupe.com>
Envoyé : vendredi 28 mars 2025 10:44
À : dev@flink.apache.org <dev@flink.apache.org>
Objet : Pyflink improvements proposal

EXTERNAL E-MAIL

Dear all,

I've noticed that there are some missing functionnalities in the Pyflink API 
that i would be willing to implement. To name a few:

  *
Table API: WindowGroupedTable does not implement the flat_aggregate method
  *
Datastream API: No support for WindowJoin and JoinFunction (I saw there is an 
umbrella JIRA ticket for improving pyflink Datastream 
Window<https://issues.apache.org/jira/browse/FLINK-26477> support but 
unfortunately there is no mention of WindowJoins)
  *
Execution environment: No support for adding custom Job Listeners

I would be willing to propose a base implementation for each of the following 
points above and submit a PR, provided that someone with a bit more experience 
than me on the flink project is willing to point me in the right direction !

Feel free to let me know if you would be interested !

Best regards,

Hugo Polsinelli

Reply via email to