> This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too much data?
Yes > Is there a way to avoid shuffle at all (or do only 1) and avoid a situation when 1 node will become a hotspot? Do you know the amount of data per kafka topic beforehand, or does this have to be dynamic? On Thu, Jun 25, 2020 at 8:15 PM Alexander Filipchik <afilipc...@gmail.com> wrote: > This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too > much data? Is there a way to avoid shuffle at all (or do only 1) and avoid > a situation when 1 node will become a hotspot? > > Alex > > On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <kklou...@gmail.com> wrote: > >> Hi Alexander, >> >> Routing of input data in Flink can be done through keying and this can >> guarantee collocation constraints. This means that you can send two >> records to the same node by giving them the same key, e.g. the topic >> name. Keep in mind that elements with different keys do not >> necessarily go to different nodes, as key assignment to nodes is >> random. >> >> Given this, you could initially key by topic, so that all records of a >> topic go to the same node. This node will compute statistics about the >> topic, e.g. elem/sec (t) and based on thresholds assign new keys to >> each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t < >> 2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2 >> will go to different machines but the probability of this happening >> will increase with the parallelism of your job. Finally, based on your >> bucket assigner and the rolling policy, you can redirect the elements >> to the same bucket, e.g. TOPIC and tune how many part-files you will >> have based on part-file size and/or time. >> >> Will this help you with your use-case? >> >> Cheers, >> Kostas >> >> >> >> >> On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik >> <afilipc...@gmail.com> wrote: >> > >> > Maybe I misreading the documentation, but: >> > "Data within the partition directories are split into part files. Each >> partition will contain at least one part file for each subtask of the sink >> that has received data for that partition." >> > >> > So, it is 1 partition per subtask. I'm trying to figure out how to >> dynamically adjust which subtask is getting the data to minimize the number >> of subtasks writing into a specific partition. >> > >> > Alex >> > >> > On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sjwies...@gmail.com> >> wrote: >> >> >> >> You can achieve this in Flink 1.10 using the StreamingFileSink. >> >> >> >> I’d also like to note that Flink 1.11 (which is currently going >> through release testing and should be available imminently) has support for >> exactly this functionality in the table API. >> >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html >> >> >> >> >> >> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik < >> afilipc...@gmail.com> wrote: >> >>> >> >>> Hello! >> >>> >> >>> We are working an a Flink Streaming job that reads data from multiple >> Kafka topics and writes them to DFS. We are using StreamingFileSink with >> custom implementation for GCS FS and it generates a lot of files as streams >> are partitioned among multiple JMs. In the ideal case we should have at >> most 1 file per kafka topic per interval. We also have some heavy topics >> and some pretty light ones, so the solution should also be smart to utilize >> resources efficiently. >> >>> >> >>> I was thinking we can partition based on how much data is ingested in >> the last minute or so to make sure: messages from the same topic are routed >> to the same (or minimal number of ) file if there are enough resources to >> do so. Think bin packing. >> >>> >> >>> Is it a good idea? Is there a built in way to achieve it? If not, is >> there a way to push state into the partitioner (or even kafka client to >> repartition in the source)? I was thinking that I can run a side stream >> that will calculate data volumes and then broadcast it into the main >> stream, so partitioner can make a decision, but it feels a bit complex. >> >>> >> >>> Another way is to modify kafka client to track messages per topics >> and make decision at that layer. >> >>> >> >>> Am I on the right path? >> >>> >> >>> Thank you >> >