Dear all, A small update on my end regarding the implementation of datasteam window join in pyflink. Looking around in the code I was thinking we could apply the same method as done for the key_by method were the JKeyByKeyBySelector class is used as a java proxy to key by the stream, the user's key_selector is used as in process function behind the scene to transform the stream as a tuple of (key, value) and then the KeyBySelector in java is used to extract the field at position 0 for the key.
Similarly, I defined a "generic" TupleJoinFunction which returns a Tuple2<first, second> and then applies the user's python JoinFunction on the output as a process function. Below a code example on how it's done: ``` JKeyByKeySelector = get_gateway().jvm.KeyByKeySelector JTupleJoinFunction = ( get_gateway().jvm.com.nw.flink.datastream.functions.TupleJoinFunction ) joined_stream = DataStream( left.map( lambda r: __import__("pyflink").common.Row(left_key_selector(r), r), output_type=Types.ROW([left_key_type, left.get_type()]), )._j_data_stream .join( right.map( lambda r: __import__("pyflink").common.Row(right_key_selector(r), r), output_type=Types.ROW([right_key_type, right.get_type()]), )._j_data_stream ) .where(JKeyByKeySelector()) .equalTo(JKeyByKeySelector()) .window(j_window_assigner) .apply( JTupleJoinFunction(), Types.TUPLE([left.get_type(), right.get_type()]).get_java_type_info(), ) ).map(lambda e: join_function(e[0], e[1]), output_type) ``` and the java implementation of the TupleJoinFunction: ``` public class TupleJoinFunction implements JoinFunction<Row, Row, Tuple2<Row, Row>> { @Override public Tuple2<Row, Row> join(Row first, Row second) throws Exception { return new Tuple2<Row, Row>((Row) first.getField(1), (Row) second.getField(1)); } } ``` The only issue I think with this proposal is that the current python WindowAssigner implementation does not give access to the underlying java object. Does someone with more expertise on the pyflink implementation have any comments on this - this would be much appreciated ? Again, provided a bit of guidance I am willing to put in the work to open a PR for the python API to support this feature Best regards, Hugo Polsinelli ________________________________ De : Hugo POLSINELLI <hugo.polsine...@nw-groupe.com> Envoyé : vendredi 28 mars 2025 10:44 À : dev@flink.apache.org <dev@flink.apache.org> Objet : Pyflink improvements proposal EXTERNAL E-MAIL Dear all, I've noticed that there are some missing functionnalities in the Pyflink API that i would be willing to implement. To name a few: * Table API: WindowGroupedTable does not implement the flat_aggregate method * Datastream API: No support for WindowJoin and JoinFunction (I saw there is an umbrella JIRA ticket for improving pyflink Datastream Window<https://issues.apache.org/jira/browse/FLINK-26477> support but unfortunately there is no mention of WindowJoins) * Execution environment: No support for adding custom Job Listeners I would be willing to propose a base implementation for each of the following points above and submit a PR, provided that someone with a bit more experience than me on the flink project is willing to point me in the right direction ! Feel free to let me know if you would be interested ! Best regards, Hugo Polsinelli