Thanks Mortiz!

We are using jmx to ship the metrics out of spark. Most of the spark
built-in driver and executor metrics are going out fine.
Does this require us to make another sink?

-Yushu


On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack <mm...@talend.com> wrote:

> Hi Yushu,
>
>
>
> Wondering, how did you configure your Spark metrics sink? And what version
> of Spark are you using?
>
>
>
> Key is to configure Spark to use one of the sinks provided by Beam, e.g.:
>
>
> "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"
>
>
>
> Currently there’s support for CSV and Graphite, but it’s simple to support
> others. The sinks typically wrap the corresponding Spark sinks and
> integrate with the Metrics registry of the Spark metric system.
>
>
>
>
> https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html
>
>
>
> When dealing with custom sinks, it’s fairly common to run into classpath
> issues :/ The metric system is loaded when an executor starts up, by then
> the application classpath isn’t available yet. Typically, that means
> distributing such code by other means.
>
>
>
> /Moritz
>
>
>
>
>
> On 14.07.22, 21:26, "Yushu Yao" <yao.yu...@gmail.com> wrote:
>
>
>
> Hi Team,  Does anyone have a working example of a beam job running on top
> of spark? So that I can use the beam metric syntax and the metrics will be
> shipped out via spark's infra?  The only thing I achieved is to be able to
> queryMetrics()
>
> ZjQcmQRYFpfptBannerStart
>
> *This Message Is From an External Sender *
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Hi Team,
>
> Does anyone have a working example of a beam job running on top of spark?
> So that I can use the beam metric syntax and the metrics will be shipped
> out via spark's infra?
>
>
>
> The only thing I achieved is to be able to queryMetrics() every half
> second and copy all the metrics into the spark metrics.
>
> Wondering if there is a better way?
>
>
>
> Thanks!
>
> -Yushu
>
>
>
> MetricQueryResults metrics =
>     pipelineResult
>         .metrics()
>         .queryMetrics(
>             MetricsFilter.*builder*()
>                 .addNameFilter(MetricNameFilter.*inNamespace*(namespace))
>                 .build());
>
> for (MetricResult<GaugeResult> cc : metrics.getGauges()) {
>     *LOGGER*.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
>     com.codahale.metrics.Gauge gauge =
>         new SimpleBeamGauge(cc.getAttempted().getValue());
>     try {
>         String name = metricName(cc.getKey(), addStepName, addNamespace);
>         if (registry.getNames().contains(name)) {
>             *LOGGER*.info("Removing metric {}", name);
>             registry.remove(name);
>         }
>         registry.register(name, gauge);
>     } catch (IllegalArgumentException e) {
>         *LOGGER*.warn("Duplicated metrics found. Try turning on 
> addStepName=true.", e);
>     }
> }
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> <https://www.talend.com/privacy/>*
>
>
>

Reply via email to