Hey, I was wondering if that's currently possible to use KeyedStream to create a properly partitioned Table in Flink 1.11 ? I have a use case where I wanted to first join two streams using Flink SQL and then process them via *KeyedProcessFunction.* So I do something like:
implicit val env = StreamExecutionEnvironment.getExecutionEnvironment implicit val ste = StreamTableEnvironment.create(env) val stream1 = env.addSource(someKafkaConsumer) val stream2 = env.addSource(otherKafkaConsumer) val table1 = ste.createTemporaryView("firstTable", stream1.keyBy(_.getId()), $"id", $"data", $"name") val table2 = ste.createTemporaryView("secondTable", stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata") ste.sqlQuery( """ |SELECT * from firstTable |JOIN secondTable ON id = number AND data = metadata |""".stripMargin ) Will Table API respect the fact that I used `KeyedStream` and will it keep the data partitioned by the keys above ? I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream *I was getting the *NullPointerException* when accessing the state inside the KeyedProcessFunction which suggests that the partitioning has indeed changed. So Is it possible to enforce partitioning when working with Table API ?? Thanks in Advance, Best Regards, Dom.