Hi Dom,

AFAIK, Table API will apply a key partitioner based on the join key for the
join operator,
[id, data] and [numbeer, metadata] in your case. So the partitioner in the
KeyedStreaem
is not respected.

Best,
Jark

On Thu, 21 Jan 2021 at 21:39, Dominik Wosiński <wos...@gmail.com> wrote:

> Hey,
> I was wondering if that's currently possible to use KeyedStream to create a
> properly partitioned Table in Flink 1.11 ? I have a use case where I wanted
> to first join two streams using Flink SQL and then process them via
> *KeyedProcessFunction.* So I do something like:
>
> implicit val env = StreamExecutionEnvironment.getExecutionEnvironment
> implicit val ste = StreamTableEnvironment.create(env)
> val stream1 = env.addSource(someKafkaConsumer)
> val stream2 = env.addSource(otherKafkaConsumer)
> val table1 = ste.createTemporaryView("firstTable",
> stream1.keyBy(_.getId()), $"id", $"data", $"name")
> val table2 = ste.createTemporaryView("secondTable",
> stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata")
> ste.sqlQuery(
>   """
>     |SELECT * from firstTable
>     |JOIN secondTable ON id = number AND data = metadata
>     |""".stripMargin
> )
>
>
> Will Table API respect the fact that I used `KeyedStream` and will it keep
> the data partitioned by the keys above ?
>
> I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream
> *I
> was getting the *NullPointerException* when accessing the state inside the
> KeyedProcessFunction which suggests that the partitioning has indeed
> changed. So Is it possible to enforce partitioning when working with Table
> API ??
>
> Thanks in Advance,
> Best Regards,
> Dom.
>

Reply via email to