Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread Dan Hill
Hi Godfrey! I'll describe the overall setup and then I'll describe the joins. One of the goals of my Flink jobs is to join incoming log records (User, Session, PageView, Requests, Insertions, Impressions, etc) and do useful things with the joined results. Input = Kafka. Value = batch log recor

Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread godfrey he
Hi Dan, What kind of joins [1] you are using? Currently, only temporal join and join with table function do not reshuffle the input data in Table API and SQL, other joins always reshuffle the input data based on join keys. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table

Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dan Hill
Hi Dawid! I see. Yea, this would break my job after I move away from the prototype. How do other Flink devs avoid unnecessary reshuffles when sourcing data from Kafka? Is the Table API early or not used often? On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz wrote: > Hi Dan, > > I am afr

Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dawid Wysakowicz
Hi Dan, I am afraid there is no mechanism to do that purely in the Table API yet. Or I am not aware of one. If the reinterpretAsKeyedStream works for you, you could use this approach and convert a DataStream (with the reinterpretAsKeyedStream applied) to a Table[1] and then continue with the Table

Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dan Hill
Hi Piotr! Yes, that's what I'm using with DataStream. It works well in my prototype. On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski wrote: > Hi, > > Have you seen "Reinterpreting a pre-partitioned data stream as keyed > stream" feature? [1] However I'm not sure if and how can it be integrated

Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Piotr Nowojski
Hi, Have you seen "Reinterpreting a pre-partitioned data stream as keyed stream" feature? [1] However I'm not sure if and how can it be integrated with the Table API. Maybe someone more familiar with the Table API can help with that? Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-st

Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-15 Thread Dan Hill
How do I avoid unnecessary reshuffles when using Kafka as input? My keys in Kafka are ~userId. The first few stages do joins that are usually (userId, someOtherKeyId). It makes sense for these joins to stay on the same machine and avoid unnecessary shuffling. What's the best way to avoid unnece