This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too
much data? Is there a way to avoid shuffle at all (or do only 1) and avoid
a situation when 1 node will become a hotspot?

Alex

On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Alexander,
>
> Routing of input data in Flink can be done through keying and this can
> guarantee collocation constraints. This means that you can send two
> records to the same node by giving them the same key, e.g. the topic
> name. Keep in mind that elements with different keys do not
> necessarily go to different nodes, as key assignment to nodes is
> random.
>
> Given this, you could initially key by topic, so that all records of a
> topic go to the same node. This node will compute statistics about the
> topic, e.g. elem/sec (t) and based on thresholds assign new keys to
> each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
> 2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
> will go to different machines but the probability of this happening
> will increase with the parallelism of your job. Finally, based on your
> bucket assigner and the rolling policy, you can redirect the elements
> to the same bucket, e.g. TOPIC and tune how many part-files you will
> have based on part-file size and/or time.
>
> Will this help you with your use-case?
>
> Cheers,
> Kostas
>
>
>
>
> On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
> <afilipc...@gmail.com> wrote:
> >
> > Maybe I misreading the documentation, but:
> > "Data within the partition directories are split into part files. Each
> partition will contain at least one part file for each subtask of the sink
> that has received data for that partition."
> >
> > So, it is 1 partition per subtask. I'm trying to figure out how to
> dynamically adjust which subtask is getting the data to minimize the number
> of subtasks writing into a specific partition.
> >
> > Alex
> >
> > On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sjwies...@gmail.com>
> wrote:
> >>
> >> You can achieve this in Flink 1.10 using the StreamingFileSink.
> >>
> >> I’d also like to note that Flink 1.11 (which is currently going through
> release testing and should be available imminently) has support for exactly
> this functionality in the table API.
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
> >>
> >>
> >> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <
> afilipc...@gmail.com> wrote:
> >>>
> >>> Hello!
> >>>
> >>> We are working an a Flink Streaming job that reads data from multiple
> Kafka topics and writes them to DFS. We are using StreamingFileSink with
> custom implementation for GCS FS and it generates a lot of files as streams
> are partitioned among multiple JMs. In the ideal case we should have at
> most 1 file per kafka topic per interval. We also have some heavy topics
> and some pretty light ones, so the solution should also be smart to utilize
> resources efficiently.
> >>>
> >>> I was thinking we can partition based on how much data is ingested in
> the last minute or so to make sure: messages from the same topic are routed
> to the same (or minimal number of ) file if there are enough resources to
> do so. Think bin packing.
> >>>
> >>> Is it a good idea? Is there a built in way to achieve it? If not, is
> there a way to push state into the partitioner (or even kafka client to
> repartition in the source)? I was thinking that I can run a side stream
> that will calculate data volumes and then broadcast it into the main
> stream, so partitioner can make a decision, but it feels a bit complex.
> >>>
> >>> Another way is to modify kafka client to track messages per topics and
> make decision at that layer.
> >>>
> >>> Am I on the right path?
> >>>
> >>> Thank you
>

Reply via email to