Would custom accumulator work for you? It should be do-able for
Map[String,Long] too
https://stackoverflow.com/questions/42293798/how-to-create-custom-set-accumulator-i-e-setstring


‪вс, 17 янв. 2021 г. в 15:16, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yur...@gmail.com>:‬

> Hey Jacek, I’ll clarify myself a bit:
> As bottom line I need following metrics being reported by structured
> streaming:
> Country-USA:7
> Country-Poland: 23
> Country-Brazil: 56
>
> The country names are included in incoming events and unknown at very
> beginning/application startup.
>
> Thus registering accumulator and binding it to metric source at driver
> side on application startup is impossible (unless you register with all
> possible country names - which is waste of Spark memory, polluting metrics
> namespace with 99% of metrics having zero value, and wasting the network
> bandwidth ).
>
>
> Отправлено с iPhone
>
> 17 янв. 2021 г., в 15:56, Jacek Laskowski <ja...@japila.pl> написал(а):
>
> 
> Hey Yurii,
>
> > which is unavailable from executors.
>
> Register it on the driver and use accumulators on executors to update the
> values (on the driver)?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> ‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
> yur...@gmail.com> wrote:‬
>
>> Hi all,
>> I have a spark application with Arbitrary Stateful Aggregation
>> implemented with FlatMapGroupsWithStateFunction.
>>
>> I want to make some statistics about incoming events inside
>> FlatMapGroupsWithStateFunction.
>> The statistics are made from some event property which on the one hand
>> has dynamic values but on the other hand - small finite set (thought
>> unknown) of values (e.g. country name).
>>
>> So I thought to register dynamic metrics inside
>> FlatMapGroupsWithStateFunction but as far as I understand, this requires
>> accessing MetricsSystem via SparkEnv.get() which is unavailable from
>> executors.
>>
>> Any thoughts/suggestions?
>>
>> With best regards,
>> Yurii
>>
>>

Reply via email to