Hi,

I'm interested in this question.
Do you have any experience to share?

I'm currently using aioprometheus, but the metric exposure is blocking:
https://github.com/claws/aioprometheus/issues/98

Looking at the official client, it seems that they added async ASGI metric 
exposure. But I don't know if their metrics update will then be blocking.
I didn't try it yet.
See https://github.com/prometheus/client_python/pull/512 

I discover this package but didn't try it yet.
https://github.com/Llandy3d/pytheus/issues/55

And there is also this prometheus-async, I asked a question there too:
https://github.com/hynek/prometheus-async/issues/59


On Sunday 11 October 2020 at 23:14:34 UTC+1 Alexander wrote:

> Is there any way to make custom collector work withing aiohttp application 
> (or any other way to convert json object to prometheus metrics in aiohttp 
> application)?
>
> I'm trying to implement custom collector according to 
> https://github.com/prometheus/client_python#custom-collectors and 
> fetching data via *`await session.get(...)`*. The only solution I found 
> is to change *`def collect()`* method to *`async def collect()`* in 
> custom collector class and override registry methods with sync loops to 
> async one and also re-implement *`generate_latest`* function.
>
> *the overrides:*
> ```
> class CustomRegistry(CollectorRegistry):
>     async def collect(self):
>         """Yields metrics from the collectors in the registry."""
>         collectors = None
>         ti = None
>         with self._lock:
>             collectors = copy.copy(self._collector_to_names)
>             if self._target_info:
>                 ti = self._target_info_metric()
>         if ti:
>             yield ti
>         for collector in collectors:
>             async for metric in collector.collect():
>                 yield metric
>
>     async def register(self, collector):
>         """Add a collector to the registry."""
>         with self._lock:
>             names = await self._get_names(collector)
>             duplicates = set(self._names_to_collectors).intersection(names)
>             if duplicates:
>                 raise ValueError(
>                     'Duplicated timeseries in CollectorRegistry: 
> {0}'.format(
>                         duplicates))
>             for name in names:
>                 self._names_to_collectors[name] = collector
>             self._collector_to_names[collector] = names
>
>     async def _get_names(self, collector):
>         """Get names of timeseries the collector produces."""
>         desc_func = None
>         # If there's a describe function, use it.
>         try:
>             desc_func = collector.describe
>         except AttributeError:
>             pass
>         # Otherwise, if auto describe is enabled use the collect function.
>         if not desc_func and self._auto_describe:
>             desc_func = collector.collect
>
>         if not desc_func:
>             return []
>
>         result = []
>         type_suffixes = {
>             'counter': ['_total', '_created'],
>             'summary': ['', '_sum', '_count', '_created'],
>             'histogram': ['_bucket', '_sum', '_count', '_created'],
>             'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
>             'info': ['_info'],
>         }
>         async for metric in desc_func():
>             for suffix in type_suffixes.get(metric.type, ['']):
>                 result.append(metric.name + suffix)
>         return result
>
>
> def sample_line(line):
>     if line.labels:
>         labelstr = '{{{0}}}'.format(','.join(
>             ['{0}="{1}"'.format(
>                 k, v.replace('\\', r'\\').replace('\n', 
> r'\n').replace('"', r'\"'))
>                 for k, v in sorted(line.labels.items())]))
>     else:
>         labelstr = ''
>     timestamp = ''
>     if line.timestamp is not None:
>         # Convert to milliseconds.
>         timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
>     return '{0}{1} {2}{3}\n'.format(
>         line.name, labelstr, floatToGoString(line.value), timestamp)
>
>
> async def generate_latest(registry):
>     """Returns the metrics from the registry in latest text format as a 
> string."""
>     output = []
>     async for metric in registry.collect():
>         try:
>             mname = metric.name
>             mtype = metric.type
>             # Munging from OpenMetrics into Prometheus format.
>             if mtype == 'counter':
>                 mname = mname + '_total'
>             elif mtype == 'info':
>                 mname = mname + '_info'
>                 mtype = 'gauge'
>             elif mtype == 'stateset':
>                 mtype = 'gauge'
>             elif mtype == 'gaugehistogram':
>                 # A gauge histogram is really a gauge,
>                 # but this captures the structure better.
>                 mtype = 'histogram'
>             elif mtype == 'unknown':
>                 mtype = 'untyped'
>
>             output.append('# HELP {0} {1}\n'.format(
>                 mname, metric.documentation.replace('\\', 
> r'\\').replace('\n', r'\n')))
>             output.append('# TYPE {0} {1}\n'.format(mname, mtype))
>
>             om_samples = {}
>             for s in metric.samples:
>                 for suffix in ['_created', '_gsum', '_gcount']:
>                     if s.name == metric.name + suffix:
>                         # OpenMetrics specific sample, put in a gauge at 
> the end.
>                         om_samples.setdefault(suffix, 
> []).append(sample_line(s))
>                         break
>                 else:
>                     output.append(sample_line(s))
>         except Exception as exception:
>             exception.args = (exception.args or ('',)) + (metric,)
>             raise
>
>         for suffix, lines in sorted(om_samples.items()):
>             output.append('# HELP {0}{1} {2}\n'.format(metric.name, 
> suffix, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
>             output.append('# TYPE {0}{1} gauge\n'.format(metric.name, 
> suffix))
>             output.extend(lines)
>     return ''.join(output).encode('utf-8')
> ```
>
> *the custom collector class:*
> ```
> class CustomCollector(object):
>     def __init__(self, app):
>         self.session = app['session']
>         self.metrics_url = app['metrics_url']
>
>     async def collect(self):
>         resp = await self.session.get(self.metrics_url)
>         payload = await resp.json()
>         for name, v in payload['metrics'].items():
>             name = name.replace('.', '_')
>             name = name.replace('-', '_')
>             name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
>             kind = v['kind']
>             if kind == 'Gauge':
>                 m = GaugeMetricFamily(name, name, labels=[])
>             if kind == 'Timer':
>                 m = SummaryMetricFamily(name, name, labels=[])
>             if kind == 'Counter':
>                 m = CounterMetricFamily(name, name, labels=[])
>             for i in v['values']:
>                 tags = i['tags']
>                 labels = {tag['key']: tag['value'] for tag in tags}
>                 value = i['values'][0]['v']
>                 timestamp = i['values'][0]['t']
>                 m.add_sample(name, labels, value, timestamp=timestamp)
>             yield m
> ```

-- 
You received this message because you are subscribed to the Google Groups 
"Prometheus Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/prometheus-users/2c36fc4e-6b53-4ff0-a147-006d4423f8a2n%40googlegroups.com.

Reply via email to