Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions prometheus_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ def remove(self, *labelvalues: Any) -> None:
warnings.warn(
"Removal of labels has not been implemented in multi-process mode yet.",
UserWarning)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Removal of labels has not been implemented in redis mode yet.",
UserWarning)

if not self._labelnames:
raise ValueError('No label names were set when constructing %s' % self)
Expand All @@ -226,6 +230,10 @@ def remove_by_labels(self, labels: dict[str, str]) -> None:
"Removal of labels has not been implemented in multi-process mode yet.",
UserWarning
)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Removal of labels has not been implemented in redis mode yet.",
UserWarning)

if not self._labelnames:
raise ValueError('No label names were set when constructing %s' % self)
Expand Down Expand Up @@ -258,6 +266,10 @@ def clear(self) -> None:
warnings.warn(
"Clearing labels has not been implemented in multi-process mode yet",
UserWarning)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Clearing of labels has not been implemented in redis mode yet.",
UserWarning)
with self._lock:
self._metrics = {}

Expand Down
123 changes: 123 additions & 0 deletions prometheus_client/redis_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from collections.abc import Iterable
import json
import os
from urllib.parse import urlsplit

from .metrics_core import Metric
from .registry import Collector, CollectorRegistry
from .samples import Sample

fake_redis_pool = {}


def redis_client():
"""
Create a redis client for PROMETHEUS_REDIS_URL.

Configure the redis database via a URL in PROMETHEUS_REDIS_URL of the form
redis://localhost:6379/0
"""
from redis import Redis

parsed_url = urlsplit(os.environ["PROMETHEUS_REDIS_URL"])
assert parsed_url.path.startswith("/")
assert parsed_url.path[1:].isdigit()
port = parsed_url.port or 6379
db = int(parsed_url.path[1:])

if parsed_url.scheme == "fakeredis":
from fakeredis import FakeRedis

if db not in fake_redis_pool:
fake_redis_pool[db] = FakeRedis()
return fake_redis_pool[db]

assert parsed_url.scheme == "redis"
return Redis(host=parsed_url.hostname, port=port, db=db)


class RedisCollector(Collector):
"""Collector for redis mode."""

def __init__(self, registry: CollectorRegistry | None) -> None:
self._client = redis_client()
if registry:
registry.register(self)

def _iter_values(self) -> Iterable[tuple[bytes, str]]:
cursor = 0
while True:
cursor, keys = self._client.scan(cursor=cursor, match="value:*")
values = self._client.mget(keys)
yield from zip(keys, values)
if cursor == 0:
break

def collect(self) -> Iterable[Metric]:
metrics: dict[str, Metric] = {}
histograms: set[str] = set()

for key, value_s in self._iter_values():
# FIXME: Catch ValueError here, just in case?
prefix_b, typ_b, mmap_key = key.split(b":", 2)
assert prefix_b == b"value"
typ = typ_b.decode()
value = float(value_s)

metric_name, name, labels, help_text = json.loads(mmap_key)

metric = metrics.get(metric_name)
if metric is None:
metric = Metric(metric_name, help_text, typ)
metrics[metric_name] = metric
if typ in ("histogram", "gaugehistogram"):
histograms.add(metric_name)

metric.add_sample(name, labels, value)

for name in histograms:
self._fix_histogram(metrics[name])

return metrics.values()

def _fix_histogram(self, metric: Metric) -> None:
"""
Fix-up histogram samples.

Sort the buckets as expected by a client, and accumulate the values.
The Histogram class is optimized to only increment the bucket that a
value first appears in, not larger ones that would also contain it.
"""
by_label: dict[tuple[tuple[str, ...], str], list[Sample]] = {}

# Organize into lists of samples by label
for sample in metric.samples:
if "le" in sample.labels:
labels_without_le = sample.labels.copy()
labels_without_le.pop("le")
key = (tuple(labels_without_le.values()), sample.name)
else:
key = (tuple(sample.labels.values()), sample.name)
by_label.setdefault(key, []).append(sample)

metric.samples = []

for (labels, name), samples in sorted(by_label.items()):
if name.endswith("_bucket"):
# Sort buckets within each label
samples.sort(key=lambda sample: float(sample.labels["le"]))

# Accumulate values into larger buckets
value = 0.0
for sample in samples:
value += sample.value
metric.samples.append(Sample(sample.name, sample.labels, value))

labels_without_le = sample.labels.copy()
labels_without_le.pop("le")
metric.samples.append(
Sample(f"{metric.name}_count", labels_without_le, value)
)

else:
metric.samples.extend(samples)
93 changes: 89 additions & 4 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,48 @@
import os
from threading import Lock
from typing import Any, Protocol
import warnings

from .mmap_dict import mmap_key, MmapedDict
from .redis_collector import redis_client
from .samples import Exemplar


class MutexValue:
class Value(Protocol):
"""Prometheus Client Metric implementation."""

_multiprocess: bool

def __init__(
self,
typ: str,
metric_name: str,
name: str,
labelnames: list[str],
labelvalues: list[str],
help_text: str,
**kwargs: Any,
) -> None:
"""Initialize a metric."""

def inc(self, amount: float) -> None:
"""Increment the metric by amount."""

def set(self, value: float, timestamp: float | None = None) -> None:
"""Set the metric to value."""

def get(self) -> float:
"""Get the current metric value."""

def set_exemplar(self, exemplar: Exemplar) -> None:
"""Set an exemplar value."""
exemplar # For vulture

def get_exemplar(self) -> Exemplar | None:
"""Get any set exemplar value."""


class MutexValue(Value):
"""A float protected by a mutex."""

_multiprocess = False
Expand Down Expand Up @@ -52,7 +89,7 @@ def MultiProcessValue(process_identifier=os.getpid):
# This avoids the need to also have mutexes in __MmapDict.
lock = Lock()

class MmapedValue:
class MmapedValue(Value):
"""A float protected by a mutex backed by a per-process mmaped file."""

_multiprocess = True
Expand Down Expand Up @@ -125,12 +162,60 @@ def get_exemplar(self):
return MmapedValue


def get_value_class():
class RedisValue(Value):
"""
A value implementation that stores data in a redis/valkey database.

Key scheme:
* value:typ:MMAP_KEY
"""

_multiprocess = False

def __init__(
self,
typ: str,
metric_name: str,
name: str,
labelnames: list[str],
labelvalues: list[str],
help_text: str,
**kwargs: Any,
) -> None:
key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._key = f"value:{typ}:{key}"
redis_client().setnx(self._key, 0.0)

def inc(self, amount: float) -> None:
redis_client().incrbyfloat(self._key, amount)

def set(self, value: float, timestamp: float | None = None) -> None:
# TODO: Implement timestamps
redis_client().set(self._key, value)

def get(self) -> float:
value = redis_client().get(self._key)
if value is None:
return 0.0
return float(value)

def set_exemplar(self, exemplar: Exemplar) -> None:
# TODO: Implement exemplars for redis.
return

def get_exemplar(self) -> Exemplar | None:
# TODO: Implement exemplars for redis.
return None


def get_value_class() -> type[Value]:
# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an environment variable.
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
if "PROMETHEUS_REDIS_URL" in os.environ:
return RedisValue
elif 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
return MultiProcessValue()
else:
return MutexValue
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ aiohttp = [
django = [
"django",
]
redis = [
"redis",
]

[project.urls]
Homepage = "https://github.com/prometheus/client_python"
Expand Down
Loading