Hi,
I have 2 streams which are partitioned based on key field. I want to join
those streams based on key fields on windows. This is an example I saw in
the flink website:
val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
val firstKeyed = firstInput.keyBy("userId")
val secondKeyed = secondInput.keyBy("id")
val result: DataStream[(MyType, AnotherType)] =
firstKeyed.join(secondKeyed)
onWindow(Time.of(5, SECONDS))
However, with current flink version,(1.1.2) I cannot do it. Basically even
if streams are keyed or not, I still have to specify the "where" and
"equal" clauses.
My question is that, is how can I implement keyed window joins in flink
streaming? And is there a difference between:
val firstInput: KeyedStream[MyType] = ...
val secondInput: KeyedStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
and
val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
val result: DataStream[(MyType, AnotherType)] =
firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..)
Thanks
Adrienne