This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too much data? Is there a way to avoid shuffle at all (or do only 1) and avoid a situation when 1 node will become a hotspot?
Alex On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Alexander, > > Routing of input data in Flink can be done through keying and this can > guarantee collocation constraints. This means that you can send two > records to the same node by giving them the same key, e.g. the topic > name. Keep in mind that elements with different keys do not > necessarily go to different nodes, as key assignment to nodes is > random. > > Given this, you could initially key by topic, so that all records of a > topic go to the same node. This node will compute statistics about the > topic, e.g. elem/sec (t) and based on thresholds assign new keys to > each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t < > 2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2 > will go to different machines but the probability of this happening > will increase with the parallelism of your job. Finally, based on your > bucket assigner and the rolling policy, you can redirect the elements > to the same bucket, e.g. TOPIC and tune how many part-files you will > have based on part-file size and/or time. > > Will this help you with your use-case? > > Cheers, > Kostas > > > > > On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik > <afilipc...@gmail.com> wrote: > > > > Maybe I misreading the documentation, but: > > "Data within the partition directories are split into part files. Each > partition will contain at least one part file for each subtask of the sink > that has received data for that partition." > > > > So, it is 1 partition per subtask. I'm trying to figure out how to > dynamically adjust which subtask is getting the data to minimize the number > of subtasks writing into a specific partition. > > > > Alex > > > > On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sjwies...@gmail.com> > wrote: > >> > >> You can achieve this in Flink 1.10 using the StreamingFileSink. > >> > >> I’d also like to note that Flink 1.11 (which is currently going through > release testing and should be available imminently) has support for exactly > this functionality in the table API. > >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html > >> > >> > >> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik < > afilipc...@gmail.com> wrote: > >>> > >>> Hello! > >>> > >>> We are working an a Flink Streaming job that reads data from multiple > Kafka topics and writes them to DFS. We are using StreamingFileSink with > custom implementation for GCS FS and it generates a lot of files as streams > are partitioned among multiple JMs. In the ideal case we should have at > most 1 file per kafka topic per interval. We also have some heavy topics > and some pretty light ones, so the solution should also be smart to utilize > resources efficiently. > >>> > >>> I was thinking we can partition based on how much data is ingested in > the last minute or so to make sure: messages from the same topic are routed > to the same (or minimal number of ) file if there are enough resources to > do so. Think bin packing. > >>> > >>> Is it a good idea? Is there a built in way to achieve it? If not, is > there a way to push state into the partitioner (or even kafka client to > repartition in the source)? I was thinking that I can run a side stream > that will calculate data volumes and then broadcast it into the main > stream, so partitioner can make a decision, but it feels a bit complex. > >>> > >>> Another way is to modify kafka client to track messages per topics and > make decision at that layer. > >>> > >>> Am I on the right path? > >>> > >>> Thank you >