Hi Kostas, Thanks for the input!
BTW, I guess you assume that the broadcasting occurs just once for bootstrapping, huh? My job needs not only bootstrapping but also periodically fetching a new version of data from some external storage. Thanks, Dongwon > 2020. 9. 23. 오전 4:59, Kostas Kloudas <kklou...@gmail.com> 작성: > > Hi Dongwon, > > If you know the data in advance, you can always use the Yarn options > in [1] (e.g. the "yarn.ship-directories") to ship the directories with > the data you want only once to each Yarn container (i.e. TM) and then > write a udf which reads them in the open() method. This will allow the > data to be shipped only once per TM but then each of the tasks will > have its own copy in memory of course. By default the visibility of > the files that you ship is set to APPLICATION [2], if I am not > mistaken so if more than one TMs go to the same node, then you will > have even less copies shipped. > > Does this help with your usecase? > > Cheers, > Kostas > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html > [2] > https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html > >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <eastcirc...@gmail.com> wrote: >> Hi, >> I'm using Flink broadcast state similar to what Fabian explained in [1]. One >> difference might be the size of the broadcasted data; the size is around >> 150MB. >> I've launched 32 TMs by setting >> - taskmanager.numberOfTaskSlots : 6 >> - parallelism of the non-broadcast side : 192 >> Here's some questions: >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right? >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in >> each TM can read the broadcasted data? I'm considering implementing a static >> class for the non-broadcast side to directly load data only once on each >> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters >> on YARN, so each TM is only for a single job). However, I'd like to use >> Flink native facilities if possible. >> The type of broadcasted data is Map<Long, Int> with around 600K entries, so >> every time the data is broadcasted a lot of GC is inevitable on each TM due >> to the (de)serialization cost. >> Any advice would be much appreciated. >> Best, >> Dongwon >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html