Hi, I have 2 streams which are partitioned based on key field. I want to join those streams based on key fields on windows. This is an example I saw in the flink website:
val firstInput: DataStream[MyType] = ... val secondInput: DataStream[AnotherType] = ... val firstKeyed = firstInput.keyBy("userId") val secondKeyed = secondInput.keyBy("id") val result: DataStream[(MyType, AnotherType)] = firstKeyed.join(secondKeyed) onWindow(Time.of(5, SECONDS)) However, with current flink version,(1.1.2) I cannot do it. Basically even if streams are keyed or not, I still have to specify the "where" and "equal" clauses. My question is that, is how can I implement keyed window joins in flink streaming? And is there a difference between: val firstInput: KeyedStream[MyType] = ... val secondInput: KeyedStream[AnotherType] = ... val result: DataStream[(MyType, AnotherType)] = firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) and val firstInput: DataStream[MyType] = ... val secondInput: DataStream[AnotherType] = ... val result: DataStream[(MyType, AnotherType)] = firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) Thanks Adrienne