Hi Ufuk, Thanks for reply. The example is at [1]. I have few questions:
If there is no difference between KeyedStream- KeyedStream join by key and DataStream-DataStream join, then DataStream becomes KeyedStream with `where` and `equal` clauses. Please correct me If I am wrong. Is the execution of windowed joins in Flink is reduced to only one machine in cluster, as it has quite low throughput, when comparing to other operations? [1] https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams Thanks Adrienne On Thu, Oct 13, 2016 at 10:59 AM Ufuk Celebi <u...@apache.org> wrote: Hey Adrienne! On Wed, Oct 12, 2016 at 4:10 PM, Adrienne Kole <adrienneko...@gmail.com> wrote: > Hi, > > I have 2 streams which are partitioned based on key field. I want to join > those streams based on key fields on windows. This is an example I saw in > the flink website: > > val firstInput: DataStream[MyType] = ... > val secondInput: DataStream[AnotherType] = ... > > val firstKeyed = firstInput.keyBy("userId") > val secondKeyed = secondInput.keyBy("id") > > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed) > onWindow(Time.of(5, SECONDS)) This does not work. I could not find this example in the Flink docs. Do you remember where you found this? Would make sense to remove it. :-) You have to go with the other approach you described (keyBy-join-where-equalTo-etc.). It would make sense to provide the keyed stream join API though. If you like, you can open a JIRA issue for it (you would need to tell me your JIRA ID so I can add you as a contributor). > val firstInput: KeyedStream[MyType] = ... > val secondInput: KeyedStream[AnotherType] = ... > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) > > and > > > val firstInput: DataStream[MyType] = ... > val secondInput: DataStream[AnotherType] = ... > val result: DataStream[(MyType, AnotherType)] = > firstKeyed.join(secondKeyed).where(..).equalTo(..).window(..).apply(..) Only if you need a specific KeyedDataStream operation, you would need to go with the KeyedStream type. There is no difference execution wise between the two examples. – Ufuk