Hey,
I was wondering if that's currently possible to use KeyedStream to create a
properly partitioned Table in Flink 1.11 ? I have a use case where I wanted
to first join two streams using Flink SQL and then process them via
*KeyedProcessFunction.* So I do something like:

implicit val env = StreamExecutionEnvironment.getExecutionEnvironment
implicit val ste = StreamTableEnvironment.create(env)
val stream1 = env.addSource(someKafkaConsumer)
val stream2 = env.addSource(otherKafkaConsumer)
val table1 = ste.createTemporaryView("firstTable",
stream1.keyBy(_.getId()), $"id", $"data", $"name")
val table2 = ste.createTemporaryView("secondTable",
stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata")
ste.sqlQuery(
  """
    |SELECT * from firstTable
    |JOIN secondTable ON id = number AND data = metadata
    |""".stripMargin
)


Will Table API respect the fact that I used `KeyedStream` and will it keep
the data partitioned by the keys above ?

I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream *I
was getting the *NullPointerException* when accessing the state inside the
KeyedProcessFunction which suggests that the partitioning has indeed
changed. So Is it possible to enforce partitioning when working with Table
API ??

Thanks in Advance,
Best Regards,
Dom.

Reply via email to