GitHub user florianschmidt1994 opened a pull request: https://github.com/apache/flink/pull/5482
[Flink-8480][DataStream] Add Java API for timebounded stream join ## What is the purpose of the change * Add a JavaAPI to the DataStream API to join two streams based on user-defined time boundaries * Design doc can be found here https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6 ## Brief change log * Add option`.between(Time, Time)` to streams that are already joined and have their key selectors `where` and `equalTo` defined * Add new inner class `TimeBounded` to `JoinedStreams`, which exposes `process(TimeBoundedJoinFunction)` as well as optional `upperBoundExclusive(boolean)` and `lowerBoundExclusive(boolean)` to the user * Add new integration test `TimeboundedJoinITCase` * **Depends on [FLINK-8479] to be merged** Full example usage: ```java streamOne .join(streamTwo) .where(new MyKeySelector()) .equalTo(new MyKeySelector()) .between(Time.milliseconds(-1), Time.milliseconds(1)) .process(new UdfTimeBoundedJoinFunction()) .addSink(new ResultSink()); ``` ## Verifying this change This change added tests and can be verified as follows: - Added integration tests in `TimeboundedJoinITCase` that validate parameter translation and execution ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/florianschmidt1994/flink flink-8480-timebounded-join-java-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5482.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5482 ---- commit 34451540116d8bdd284fd01016a4cc74d8564d37 Author: Florian Schmidt <florian.schmidt.1994@...> Date: 2018-01-18T14:47:14Z [FLINK-8479] Implement TimeBoundedStreamJoinOperator This operator is the basis for performing an inner join on two streams using a time criteria defined as a lower and upper bound commit fe65b1ead0511b0df5d640c728f5ce9e273d7ed5 Author: Florian Schmidt <florian.schmidt.1994@...> Date: 2018-02-13T14:48:40Z [FLINK-8480][DataStream] Add java api for timebounded stream joins This commit adds a java implementation for timebounded stream joins. The usage looks roughly like the following: ```java streamOne .join(streamTwo) .where(new Tuple2KeyExtractor()) .equalTo(new Tuple2KeyExtractor()) .between(Time.milliseconds(0), Time.milliseconds(1)) .process(new CombineToStringJoinFunction()) .addSink(new ResultSink()); ``` This change adds the functionality in JoinedStreams.java and adds integration tests in TimeboundedJoinITCase.java ---- ---