Thanks Mortiz! We are using jmx to ship the metrics out of spark. Most of the spark built-in driver and executor metrics are going out fine. Does this require us to make another sink?
-Yushu On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack <mm...@talend.com> wrote: > Hi Yushu, > > > > Wondering, how did you configure your Spark metrics sink? And what version > of Spark are you using? > > > > Key is to configure Spark to use one of the sinks provided by Beam, e.g.: > > > "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink" > > > > Currently there’s support for CSV and Graphite, but it’s simple to support > others. The sinks typically wrap the corresponding Spark sinks and > integrate with the Metrics registry of the Spark metric system. > > > > > https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html > > > > When dealing with custom sinks, it’s fairly common to run into classpath > issues :/ The metric system is loaded when an executor starts up, by then > the application classpath isn’t available yet. Typically, that means > distributing such code by other means. > > > > /Moritz > > > > > > On 14.07.22, 21:26, "Yushu Yao" <yao.yu...@gmail.com> wrote: > > > > Hi Team, Does anyone have a working example of a beam job running on top > of spark? So that I can use the beam metric syntax and the metrics will be > shipped out via spark's infra? The only thing I achieved is to be able to > queryMetrics() > > ZjQcmQRYFpfptBannerStart > > *This Message Is From an External Sender * > > This message came from outside your organization. > > Exercise caution when opening attachments or clicking any links. > > ZjQcmQRYFpfptBannerEnd > > Hi Team, > > Does anyone have a working example of a beam job running on top of spark? > So that I can use the beam metric syntax and the metrics will be shipped > out via spark's infra? > > > > The only thing I achieved is to be able to queryMetrics() every half > second and copy all the metrics into the spark metrics. > > Wondering if there is a better way? > > > > Thanks! > > -Yushu > > > > MetricQueryResults metrics = > pipelineResult > .metrics() > .queryMetrics( > MetricsFilter.*builder*() > .addNameFilter(MetricNameFilter.*inNamespace*(namespace)) > .build()); > > for (MetricResult<GaugeResult> cc : metrics.getGauges()) { > *LOGGER*.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); > com.codahale.metrics.Gauge gauge = > new SimpleBeamGauge(cc.getAttempted().getValue()); > try { > String name = metricName(cc.getKey(), addStepName, addNamespace); > if (registry.getNames().contains(name)) { > *LOGGER*.info("Removing metric {}", name); > registry.remove(name); > } > registry.register(name, gauge); > } catch (IllegalArgumentException e) { > *LOGGER*.warn("Duplicated metrics found. Try turning on > addStepName=true.", e); > } > } > > *As a recipient of an email from Talend, your contact personal data will > be on our systems. Please see our privacy notice. > <https://www.talend.com/privacy/>* > > >