Hi Dom, AFAIK, Table API will apply a key partitioner based on the join key for the join operator, [id, data] and [numbeer, metadata] in your case. So the partitioner in the KeyedStreaem is not respected.
Best, Jark On Thu, 21 Jan 2021 at 21:39, Dominik Wosiński <wos...@gmail.com> wrote: > Hey, > I was wondering if that's currently possible to use KeyedStream to create a > properly partitioned Table in Flink 1.11 ? I have a use case where I wanted > to first join two streams using Flink SQL and then process them via > *KeyedProcessFunction.* So I do something like: > > implicit val env = StreamExecutionEnvironment.getExecutionEnvironment > implicit val ste = StreamTableEnvironment.create(env) > val stream1 = env.addSource(someKafkaConsumer) > val stream2 = env.addSource(otherKafkaConsumer) > val table1 = ste.createTemporaryView("firstTable", > stream1.keyBy(_.getId()), $"id", $"data", $"name") > val table2 = ste.createTemporaryView("secondTable", > stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata") > ste.sqlQuery( > """ > |SELECT * from firstTable > |JOIN secondTable ON id = number AND data = metadata > |""".stripMargin > ) > > > Will Table API respect the fact that I used `KeyedStream` and will it keep > the data partitioned by the keys above ? > > I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream > *I > was getting the *NullPointerException* when accessing the state inside the > KeyedProcessFunction which suggests that the partitioning has indeed > changed. So Is it possible to enforce partitioning when working with Table > API ?? > > Thanks in Advance, > Best Regards, > Dom. >