Hi, Ok. I think I get it.
WHAT IF: Assume we create a addKeyedSource(...) which will allow us to add a source that makes some guarantees about the data. And assume this source returns simply the Kafka partition id as the result of this 'hash' function. Then if I have 10 kafka partitions I would read these records in and I could filter the data more efficiently because the data would not need to go over the network before this filter. Afterwards I can scale it up to 'many' tasks for the heavier processing that follows. As a concept: Could that be made to work? Niels On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Niels, > > Thank you for bringing this up. I recall there was some previous > discussion related to this before: [1]. > > I don’t think this is possible at the moment, mainly because of how the > API is designed. > > On the other hand, a KeyedStream in Flink is basically just a DataStream > with a hash partitioner that is used when deciding which instance of the > following downstream operator an emitted record of the stream is sent to. > So, even if we have a Kafka source that directly produces a KeyedStream on > “addSource”, redistribution of data can still happen. I.e., if the > parallelism of the compute operators right after is different than the > number of Kafka partitions, redistribution will happen to let the key space > and state be evenly distributed in Flink. > > This leads to the argument that we probably need to think about whether > retaining the original partitioning of records in Kafka when consumed by > Flink is actually only a special case. > Flink, as a parallel compute engine, can freely adjust the parallelism of > its operators regardless of the parallelism of Kafka topics (rescaling > isn’t actually in yet, but is on the near-future roadmap). > > So, under the general case, the parallelism of a Flink operator may be > different than the number of Kafka partitions, and therefore redistributing > must occur. > For redistribution to not need to take place right after an already > partitioned Kafka topic, you’d need identical numbers of 1) Kafka > partitions, 2) Flink source instances consuming the partitions, and 3) the > parallelism of the keyed computation afterwards. This seems like a very > specific situation, considering that you’ll be able to rescale Flink > operators as the data’s key space / volume grows. > > The main observation, I think, is that Flink itself maintains how the key > space is partitioned within the system, which plays a crucial part in > rescaling. That’s why by default it doesn’t respect existing partitioning > of the key space in Kafka (or other external sources). Even if it initially > does at the beginning of a job, partitioning will most likely change as you > rescale your job / operators (which is a good thing, to be able to adapt). > > Cheers, > Gordon > > [1] http://apache-flink-mailing-list-archive.1008284. > n3.nabble.com/kafka-partition-assignment-td12123.html > > On January 6, 2017 at 1:38:05 AM, Niels Basjes (ni...@basjes.nl) wrote: > > Hi, > > In my scenario I have click stream data that I persist in Kafka. > I use the sessionId as the key to instruct Kafka to put everything with > the same sessionId into the same Kafka partition. That way I already have > all events of a visitor in a single kafka partition in a fixed order. > > When I read this data into Flink I get a generic data stream ontop of > which I have to do a keyBy before my processing can continue. Such a keyBy > will redistribute the data again to later tasks that can do the actual work. > > Is it possible to create an adapted version of the Kafka source that > immediately produces a keyed data stream? > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > > -- Best regards / Met vriendelijke groeten, Niels Basjes