Would custom accumulator work for you? It should be do-able for Map[String,Long] too https://stackoverflow.com/questions/42293798/how-to-create-custom-set-accumulator-i-e-setstring
вс, 17 янв. 2021 г. в 15:16, "Yuri Oleynikov (יורי אולייניקוב)" < yur...@gmail.com>: > Hey Jacek, I’ll clarify myself a bit: > As bottom line I need following metrics being reported by structured > streaming: > Country-USA:7 > Country-Poland: 23 > Country-Brazil: 56 > > The country names are included in incoming events and unknown at very > beginning/application startup. > > Thus registering accumulator and binding it to metric source at driver > side on application startup is impossible (unless you register with all > possible country names - which is waste of Spark memory, polluting metrics > namespace with 99% of metrics having zero value, and wasting the network > bandwidth ). > > > Отправлено с iPhone > > 17 янв. 2021 г., в 15:56, Jacek Laskowski <ja...@japila.pl> написал(а): > > > Hey Yurii, > > > which is unavailable from executors. > > Register it on the driver and use accumulators on executors to update the > values (on the driver)? > > Pozdrawiam, > Jacek Laskowski > ---- > https://about.me/JacekLaskowski > "The Internals Of" Online Books <https://books.japila.pl/> > Follow me on https://twitter.com/jaceklaskowski > > <https://twitter.com/jaceklaskowski> > > > On Sat, Jan 16, 2021 at 2:21 PM Yuri Oleynikov (יורי אולייניקוב < > yur...@gmail.com> wrote: > >> Hi all, >> I have a spark application with Arbitrary Stateful Aggregation >> implemented with FlatMapGroupsWithStateFunction. >> >> I want to make some statistics about incoming events inside >> FlatMapGroupsWithStateFunction. >> The statistics are made from some event property which on the one hand >> has dynamic values but on the other hand - small finite set (thought >> unknown) of values (e.g. country name). >> >> So I thought to register dynamic metrics inside >> FlatMapGroupsWithStateFunction but as far as I understand, this requires >> accessing MetricsSystem via SparkEnv.get() which is unavailable from >> executors. >> >> Any thoughts/suggestions? >> >> With best regards, >> Yurii >> >>