Hey Adrienne! On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <[email protected]> wrote: > Hi, > > I have 2 streams which are partitioned based on key field. I want to join > those streams based on key fields on windows. This is an example I saw in > the flink website: > > val firstInput: DataStream[MyType] = ... > val secondInput: DataStream[AnotherType] = ... > > val firstKeyed = firstInput.keyBy("userId") > val secondKeyed = secondInput.keyBy("id") > > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed) > onWindow(Time.of(5, SECONDS))
This does not work. I could not find this example in the Flink docs. Do you remember where you found this? Would make sense to remove it. :-) You have to go with the other approach you described (keyBy-join-where-equalTo-etc.). It would make sense to provide the keyed stream join API though. If you like, you can open a JIRA issue for it (you would need to tell me your JIRA ID so I can add you as a contributor). > val firstInput: KeyedStream[MyType] = ... > val secondInput: KeyedStream[AnotherType] = ... > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) > > and > > > val firstInput: DataStream[MyType] = ... > val secondInput: DataStream[AnotherType] = ... > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) Only if you need a specific KeyedDataStream operation, you would need to go with the KeyedStream type. There is no difference execution wise between the two examples. – Ufuk
