Hi Dongwon, Yes, you are right that I assume that broadcasting occurs once. This is what I meant by "If you know the data in advance". Sorry for not being clear. If you need to periodically broadcast new versions of the data, then I cannot find a better solution than the one you propose with the static var.
Cheers, Kostas On Wed, Sep 23, 2020 at 11:49 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > > Hi Kostas, > > Thanks for the input! > > BTW, I guess you assume that the broadcasting occurs just once for > bootstrapping, huh? > My job needs not only bootstrapping but also periodically fetching a > new version of data from some external storage. > > Thanks, > > Dongwon > > > 2020. 9. 23. 오전 4:59, Kostas Kloudas <kklou...@gmail.com> 작성: > > > > Hi Dongwon, > > > > > > > > > If you know the data in advance, you can always use the Yarn options > > in [1] (e.g. the "yarn.ship-directories") to ship the directories with > > the data you want only once to each Yarn container (i.e. TM) and then > > write a udf which reads them in the open() method. This will allow the > > data to be shipped only once per TM but then each of the tasks will > > have its own copy in memory of course. By default the visibility of > > the files that you ship is set to APPLICATION [2], if I am not > > mistaken so if more than one TMs go to the same node, then you will > > have even less copies shipped. > > > > Does this help with your usecase? > > > > Cheers, > > Kostas > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html > > [2] > > https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html > > > >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi, > >> I'm using Flink broadcast state similar to what Fabian explained in [1]. > >> One difference might be the size of the broadcasted data; the size is > >> around 150MB. > >> I've launched 32 TMs by setting > >> - taskmanager.numberOfTaskSlots : 6 > >> - parallelism of the non-broadcast side : 192 > >> Here's some questions: > >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it > >> right? > >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in > >> each TM can read the broadcasted data? I'm considering implementing a > >> static class for the non-broadcast side to directly load data only once on > >> each TaskManager instead of the broadcast state (FYI, I'm using per-job > >> clusters on YARN, so each TM is only for a single job). However, I'd like > >> to use Flink native facilities if possible. > >> The type of broadcasted data is Map<Long, Int> with around 600K entries, > >> so every time the data is broadcasted a lot of GC is inevitable on each TM > >> due to the (de)serialization cost. > >> Any advice would be much appreciated. > >> Best, > >> Dongwon > >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html