Hi Kostas,

Thanks for the input!

BTW, I guess you assume that the broadcasting occurs just once for
bootstrapping, huh?
My job needs not only bootstrapping but also periodically fetching a
new version of data from some external storage.

Thanks,

Dongwon

> 2020. 9. 23. 오전 4:59, Kostas Kloudas <kklou...@gmail.com> 작성:
>
> Hi Dongwon,





>
> If you know the data in advance, you can always use the Yarn options
> in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> the data you want only once to each Yarn container (i.e. TM) and then
> write a udf which reads them in the open() method. This will allow the
> data to be shipped only once per TM but then each of the tasks will
> have its own copy in memory of course. By default the visibility of
> the files that you ship is set to APPLICATION [2], if I am not
> mistaken so if more than one TMs go to the same node, then you will
> have even less copies shipped.
>
> Does this help with your usecase?
>
> Cheers,
> Kostas
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> [2] 
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
>
>> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
>> Hi,
>> I'm using Flink broadcast state similar to what Fabian explained in [1]. One 
>> difference might be the size of the broadcasted data; the size is around 
>> 150MB.
>> I've launched 32 TMs by setting
>> - taskmanager.numberOfTaskSlots : 6
>> - parallelism of the non-broadcast side : 192
>> Here's some questions:
>> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
>> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
>> each TM can read the broadcasted data? I'm considering implementing a static 
>> class for the non-broadcast side to directly load data only once on each 
>> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters 
>> on YARN, so each TM is only for a single job). However, I'd like to use 
>> Flink native facilities if possible.
>> The type of broadcasted data is Map<Long, Int> with around 600K entries, so 
>> every time the data is broadcasted a lot of GC is inevitable on each TM due 
>> to the (de)serialization cost.
>> Any advice would be much appreciated.
>> Best,
>> Dongwon
>> [1] https://flink.apache.org/2019/06/26/broadcast-state.html

Reply via email to