diff --git a/docs/docs.json b/docs/docs.json index 2b99e176e..bd32aaed7 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -75,7 +75,8 @@ "group": "Integrations", "pages": [ "integrations/langgraph-integration", - "integrations/openenv-integration" + "integrations/openenv-integration", + "integrations/verifiers-integration" ] }, { diff --git a/docs/integrations/verifiers-integration-demo.mp4 b/docs/integrations/verifiers-integration-demo.mp4 new file mode 100644 index 000000000..9b21081c2 Binary files /dev/null and b/docs/integrations/verifiers-integration-demo.mp4 differ diff --git a/docs/integrations/verifiers-integration.mdx b/docs/integrations/verifiers-integration.mdx new file mode 100644 index 000000000..0831541f4 --- /dev/null +++ b/docs/integrations/verifiers-integration.mdx @@ -0,0 +1,111 @@ +--- +title: "Verifiers" +description: "Run Prime Intellect verifiers environments from ART training loops" +--- + +# Verifiers Integration + +[verifiers](https://github.com/PrimeIntellect-ai/verifiers) provides reusable RL +environments for model evaluation and training. ART can consume those rollouts +through `art.verifiers`, which keeps verifiers as an optional dependency and +converts rollout outputs into ART trajectories. + +## Install + +```bash +uv pip install -U openpipe-art verifiers +``` + +## Run a verifiers environment in ART + +Use `rollout_with_verifiers_environment` inside your ART rollout function. It +passes ART's managed OpenAI-compatible client to the verifiers environment and +returns an `art.Trajectory`. + +```python +import art +from art.verifiers import rollout_with_verifiers_environment +import verifiers as vf + + +async def rollout( + model: art.TrainableModel, + env: vf.Environment, + input: vf.RolloutInput, +) -> art.Trajectory: + return await rollout_with_verifiers_environment( + env, + model, + input, + sampling_args={"n": 1, "temperature": 1.0}, + ) +``` + +For grouped rollouts, use `trajectory_group_with_verifiers_environment`: + +```python +from art.verifiers import trajectory_group_with_verifiers_environment + +group = await trajectory_group_with_verifiers_environment( + env, + model, + group_inputs, + sampling_args={"n": 1, "temperature": 1.0}, +) +``` + +## Convert saved rollouts + +If you already have verifiers outputs, convert them without rerunning the +environment: + +```python +from art.verifiers import trajectory_from_verifiers_rollout + +trajectory = trajectory_from_verifiers_rollout(output) +``` + +For the richest transcript, include the verifiers trajectory column when +generating outputs: + +```python +output = await env.run_rollout( + input=input, + client=client, + model=model_name, + sampling_args={"n": 1}, + state_columns=["trajectory"], +) +``` + +The reverse conversion is also available for tooling that expects a +verifiers-compatible output shape: + +```python +from art.verifiers import rollout_output_from_trajectory + +output = rollout_output_from_trajectory(trajectory) +``` + +To validate 2-way portability for saved verifiers outputs, round-trip through +ART and back to normalized verifiers-compatible output: + +```python +from art.verifiers import normalize_verifiers_rollout_output + +normalized = normalize_verifiers_rollout_output(output) +``` + +The normalizer preserves reward, numeric metrics, tool definitions, logs, +completion state, answer/stop metadata, and prompt/completion message shape +while adding ART metadata to the optional trajectory column. + +## Notes + +- `art.verifiers` does not import verifiers until you call a function that + runs an environment. +- Multi-turn verifiers trajectories are reconstructed by appending only the + new prompt suffix for each step, then that step's completion. +- ART trajectories created from serialized verifiers outputs can train with + `allow_training_without_logprobs=True` because serialized assistant messages + do not carry OpenAI logprobs. diff --git a/src/art/test/test_verifiers_bridge.py b/src/art/test/test_verifiers_bridge.py new file mode 100644 index 000000000..a70f6a20b --- /dev/null +++ b/src/art/test/test_verifiers_bridge.py @@ -0,0 +1,368 @@ +import inspect +import sys +import types + +import pytest + +import art +from art.verifiers import ( + normalize_verifiers_rollout_output, + normalize_verifiers_rollout_outputs, + rollout_output_from_trajectory, + rollout_outputs_from_trajectory_group, + rollout_with_verifiers_environment, + trajectory_from_verifiers_rollout, + trajectory_group_from_verifiers_outputs, + trajectory_group_with_verifiers_environment, +) + + +def test_trajectory_from_verifiers_rollout_reconstructs_multiturn_steps(): + output = { + "example_id": 7, + "prompt": [{"role": "user", "content": "Find the invoice"}], + "reward": 1.0, + "metrics": {"accuracy": 1.0, "notes": "ignored"}, + "is_completed": True, + "is_truncated": False, + "stop_condition": "answer_ready", + "tool_defs": [ + { + "name": "search", + "description": "Search mail", + "parameters": {"type": "object", "properties": {}}, + } + ], + "trajectory": [ + { + "prompt": [{"role": "user", "content": "Find the invoice"}], + "completion": [{"role": "assistant", "content": "Searching"}], + }, + { + "prompt": [ + {"role": "user", "content": "Find the invoice"}, + {"role": "assistant", "content": "Searching"}, + {"role": "tool", "tool_call_id": "t1", "content": "Invoice #42"}, + ], + "completion": [{"role": "assistant", "content": "Invoice #42"}], + }, + ], + } + + trajectory = trajectory_from_verifiers_rollout(output) + + assert trajectory.reward == 1.0 + assert trajectory.metrics["accuracy"] == 1.0 + assert trajectory.metadata["verifiers_example_id"] == 7 + assert trajectory.metadata["verifiers_stop_condition"] == "answer_ready" + assert trajectory.messages_and_choices == [ + {"role": "user", "content": "Find the invoice"}, + {"role": "assistant", "content": "Searching"}, + {"role": "tool", "tool_call_id": "t1", "content": "Invoice #42"}, + {"role": "assistant", "content": "Invoice #42"}, + ] + assert trajectory.tools == [ + { + "type": "function", + "function": { + "name": "search", + "description": "Search mail", + "parameters": {"type": "object", "properties": {}}, + }, + } + ] + + +def test_trajectory_from_verifiers_rollout_falls_back_to_prompt_completion(): + output = { + "prompt": [{"role": "user", "content": "2 + 2?"}], + "completion": [{"role": "assistant", "content": "4"}], + "reward": 1, + } + + trajectory = trajectory_from_verifiers_rollout(output) + + assert trajectory.messages_and_choices == [ + {"role": "user", "content": "2 + 2?"}, + {"role": "assistant", "content": "4"}, + ] + + +def test_rollout_output_from_trajectory_splits_prompt_and_completion(): + trajectory = art.Trajectory( + messages_and_choices=[ + {"role": "system", "content": "Be concise"}, + {"role": "user", "content": "2 + 2?"}, + {"role": "assistant", "content": "4"}, + ], + reward=0.75, + metrics={"accuracy": 1.0}, + metadata={"trajectory_id": "abc"}, + tools=[ + { + "type": "function", + "function": { + "name": "calculator", + "description": "Calculate", + "parameters": {"type": "object", "properties": {}}, + }, + } + ], + ) + + output = rollout_output_from_trajectory(trajectory, example_id=3) + + assert output["example_id"] == 3 + assert output["prompt"] == [ + {"role": "system", "content": "Be concise"}, + {"role": "user", "content": "2 + 2?"}, + ] + assert output["completion"] == [{"role": "assistant", "content": "4"}] + assert output["reward"] == 0.75 + assert output["metrics"] == {"accuracy": 1.0} + assert output["tool_defs"] == [ + { + "name": "calculator", + "description": "Calculate", + "parameters": {"type": "object", "properties": {}}, + } + ] + assert output["trajectory"][0]["trajectory_id"] == "abc" + + +def test_trajectory_group_from_verifiers_outputs(): + group = trajectory_group_from_verifiers_outputs( + [ + {"prompt": "first", "completion": [{"role": "assistant", "content": "a"}]}, + {"prompt": "second", "completion": [{"role": "assistant", "content": "b"}]}, + ] + ) + + assert len(group) == 2 + assert group.trajectories[0].messages_and_choices[0] == { + "role": "user", + "content": "first", + } + + +def test_rollout_outputs_from_trajectory_group_assigns_example_ids(): + group = art.TrajectoryGroup( + [ + art.Trajectory( + messages_and_choices=[ + {"role": "user", "content": "first"}, + {"role": "assistant", "content": "a"}, + ], + reward=1.0, + ), + art.Trajectory( + messages_and_choices=[ + {"role": "user", "content": "second"}, + {"role": "assistant", "content": "b"}, + ], + reward=0.5, + ), + ] + ) + + outputs = rollout_outputs_from_trajectory_group(group, first_example_id=10) + + assert [output["example_id"] for output in outputs] == [10, 11] + assert [output["reward"] for output in outputs] == [1.0, 0.5] + + +def test_normalize_verifiers_rollout_output_round_trips_through_art(): + output = { + "example_id": 42, + "prompt": [{"role": "user", "content": "Use a tool"}], + "completion": [{"role": "assistant", "content": "done"}], + "reward": 0.25, + "metrics": {"score": 0.25, "label": "ignored"}, + "logs": ["started", "scored"], + "answer": "done", + "stop_condition": "final_answer", + "is_completed": True, + "is_truncated": True, + "timing": {"total": 1.5}, + "tool_defs": [ + { + "name": "lookup", + "description": "Lookup records", + "parameters": {"type": "object", "properties": {}}, + } + ], + } + + normalized = normalize_verifiers_rollout_output(output) + + assert normalized["example_id"] == 42 + assert normalized["prompt"] == [{"role": "user", "content": "Use a tool"}] + assert normalized["completion"] == [{"role": "assistant", "content": "done"}] + assert normalized["reward"] == 0.25 + assert normalized["metrics"]["score"] == 0.25 + assert "label" not in normalized["metrics"] + assert normalized["logs"] == ["started", "scored"] + assert normalized["answer"] == "done" + assert normalized["stop_condition"] == "final_answer" + assert normalized["is_completed"] is True + assert normalized["is_truncated"] is True + assert normalized["timing"] == {"total": 1.5} + assert normalized["tool_defs"] == output["tool_defs"] + assert normalized["trajectory"][0]["extras"]["art_metadata"]["verifiers_example_id"] == 42 + + +def test_normalize_verifiers_rollout_outputs_handles_iterables(): + outputs = normalize_verifiers_rollout_outputs( + [ + {"example_id": 1, "prompt": "first", "completion": [{"role": "assistant", "content": "a"}], "reward": 1}, + {"example_id": 2, "prompt": "second", "completion": [{"role": "assistant", "content": "b"}], "reward": 0}, + ], + include_trajectory=False, + ) + + assert [output["example_id"] for output in outputs] == [1, 2] + assert "trajectory" not in outputs[0] + assert outputs[0]["completion"] == [{"role": "assistant", "content": "a"}] + + +def test_real_verifiers_package_contract_if_installed(): + verifiers = pytest.importorskip("verifiers") + client_module = pytest.importorskip( + "verifiers.clients.openai_chat_completions_client" + ) + env_module = pytest.importorskip("verifiers.envs.environment") + + assert hasattr(client_module, "OpenAIChatCompletionsClient") + + rollout_params = inspect.signature(env_module.Environment.run_rollout).parameters + assert {"input", "client", "model", "sampling_args"} <= set(rollout_params) + assert {"max_retries", "state_columns"} <= set(rollout_params) + + group_params = inspect.signature(env_module.Environment.run_group).parameters + assert {"group_inputs", "client", "model", "sampling_args"} <= set(group_params) + assert {"max_retries", "state_columns"} <= set(group_params) + + output = verifiers.RolloutOutput( + example_id=7, + prompt=[{"role": "user", "content": "hi"}], + completion=[{"role": "assistant", "content": "hello"}], + reward=1.0, + timing={"total": 0.1}, + is_completed=True, + is_truncated=False, + metrics={"score": 1.0}, + answer="hello", + info={}, + error=None, + stop_condition="final_answer", + trajectory=[], + tool_defs=[], + token_usage={}, + ) + + trajectory = trajectory_from_verifiers_rollout(output) + + assert trajectory.reward == 1.0 + assert trajectory.metrics["score"] == 1.0 + assert trajectory.metadata["verifiers_example_id"] == 7 + assert trajectory.messages_and_choices == [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + ] + + +async def test_rollout_with_verifiers_environment_uses_art_model_client(monkeypatch): + _install_fake_verifiers_client(monkeypatch) + env = _FakeVerifiersEnv() + model = _FakeArtModel() + + trajectory = await rollout_with_verifiers_environment( + env, + model, + {"prompt": [{"role": "user", "content": "hi"}], "example_id": 1}, + sampling_args={"temperature": 0.2}, + state_columns=("trajectory", "custom"), + ) + + assert trajectory.reward == 1.0 + assert trajectory.messages_and_choices[-1] == { + "role": "assistant", + "content": "done", + } + assert env.last_rollout_call["client"].raw_client == "art-openai-client" + assert env.last_rollout_call["model"] == "art-model" + assert env.last_rollout_call["sampling_args"] == {"temperature": 0.2} + assert env.last_rollout_call["state_columns"] == ["trajectory", "custom"] + + +async def test_trajectory_group_with_verifiers_environment(monkeypatch): + _install_fake_verifiers_client(monkeypatch) + env = _FakeVerifiersEnv() + model = _FakeArtModel() + + group = await trajectory_group_with_verifiers_environment( + env, + model, + [{"prompt": "a"}, {"prompt": "b"}], + ) + + assert len(group) == 2 + assert env.last_group_call["client"].raw_client == "art-openai-client" + assert env.last_group_call["model"] == "art-model" + assert group.trajectories[0].messages_and_choices[-1] == { + "role": "assistant", + "content": "group done", + } + + +class _FakeArtModel: + def openai_client(self): + return "art-openai-client" + + def get_inference_name(self): + return "art-model" + + +class _FakeOpenAIChatCompletionsClient: + def __init__(self, raw_client): + self.raw_client = raw_client + + +class _FakeVerifiersEnv: + def __init__(self): + self.last_rollout_call = None + self.last_group_call = None + + async def run_rollout(self, **kwargs): + self.last_rollout_call = kwargs + return { + "prompt": kwargs["input"]["prompt"], + "completion": [{"role": "assistant", "content": "done"}], + "reward": 1.0, + } + + async def run_group(self, **kwargs): + self.last_group_call = kwargs + return [ + { + "prompt": input_value["prompt"], + "completion": [{"role": "assistant", "content": "group done"}], + "reward": 1.0, + } + for input_value in kwargs["group_inputs"] + ] + + +def _install_fake_verifiers_client(monkeypatch): + verifiers_module = types.ModuleType("verifiers") + clients_module = types.ModuleType("verifiers.clients") + client_module = types.ModuleType("verifiers.clients.openai_chat_completions_client") + client_module.OpenAIChatCompletionsClient = _FakeOpenAIChatCompletionsClient + + monkeypatch.setitem(sys.modules, "verifiers", verifiers_module) + monkeypatch.setitem(sys.modules, "verifiers.clients", clients_module) + monkeypatch.setitem( + sys.modules, + "verifiers.clients.openai_chat_completions_client", + client_module, + ) diff --git a/src/art/verifiers.py b/src/art/verifiers.py new file mode 100644 index 000000000..697b07dd5 --- /dev/null +++ b/src/art/verifiers.py @@ -0,0 +1,422 @@ +from __future__ import annotations + +from collections.abc import Iterable, Mapping, Sequence +from copy import deepcopy +import time +from typing import Any, cast + +from openai.types.chat.chat_completion import Choice + +from .trajectories import Trajectory, TrajectoryGroup +from .types import Messages, Tools + + +def trajectory_from_verifiers_rollout(output: Mapping[str, Any]) -> Trajectory: + """Convert a verifiers RolloutOutput or serialized State into an ART Trajectory. + + When the output includes `trajectory` (for example from + `env.run_rollout(..., state_columns=["trajectory"])`), the full multi-turn + transcript is reconstructed from the verifiers steps. Otherwise the + conversion falls back to `prompt + completion`. + """ + + messages = _messages_from_verifiers_output(output) + metrics = dict(cast(Mapping[str, Any], output.get("metrics") or {})) + metadata = _verifiers_metadata(output) + tools = _openai_tools_from_verifiers_tools(output.get("tool_defs")) + return Trajectory( + messages_and_choices=messages, + tools=tools, + reward=float(output.get("reward") or 0.0), + metrics=_numeric_metrics(metrics), + metadata=metadata, + logs=_string_list(output.get("logs")), + ).finish() + + +def trajectory_group_from_verifiers_outputs( + outputs: Iterable[Mapping[str, Any]], +) -> TrajectoryGroup: + """Convert a group of verifiers rollout outputs into an ART TrajectoryGroup.""" + + return TrajectoryGroup( + [trajectory_from_verifiers_rollout(output) for output in outputs] + ) + + +def rollout_output_from_trajectory( + trajectory: Trajectory, + *, + example_id: int = 0, + prompt_length: int | None = None, + include_trajectory: bool = True, +) -> dict[str, Any]: + """Convert an ART Trajectory into a verifiers-compatible RolloutOutput dict. + + `prompt_length` controls the split between the initial prompt and the + generated completion. If omitted, the split occurs before the first + assistant message. + """ + + messages = _messages_from_art_items(trajectory.messages_and_choices) + split_at = _prompt_length(messages, prompt_length) + prompt = messages[:split_at] + completion = messages[split_at:] + output: dict[str, Any] = { + "example_id": example_id, + "prompt": prompt, + "completion": completion, + "reward": trajectory.reward, + "timing": { + "start_time": time.time(), + "setup": {"start": 0.0, "end": 0.0, "duration": 0.0}, + "generation": {"start": 0.0, "end": 0.0, "duration": 0.0}, + "scoring": {"start": 0.0, "end": 0.0, "duration": 0.0}, + "model": {"spans": [], "duration": 0.0}, + "env": {"spans": [], "duration": 0.0}, + "total": 0.0, + "overhead": 0.0, + }, + "is_completed": True, + "is_truncated": bool(trajectory.metadata.get("is_truncated", False)), + "metrics": dict(trajectory.metrics), + "tool_defs": _verifiers_tools_from_openai_tools(trajectory.tools), + } + if include_trajectory: + output["trajectory"] = [ + { + "prompt": prompt, + "completion": completion, + "response": None, + "tokens": None, + "reward": trajectory.reward, + "advantage": None, + "is_truncated": output["is_truncated"], + "trajectory_id": str(trajectory.metadata.get("trajectory_id", "")), + "extras": {"art_metadata": dict(trajectory.metadata)}, + } + ] + return output + + +def rollout_outputs_from_trajectory_group( + group: TrajectoryGroup, + *, + first_example_id: int = 0, + prompt_length: int | None = None, + include_trajectory: bool = True, +) -> list[dict[str, Any]]: + """Convert an ART TrajectoryGroup into verifiers-compatible outputs.""" + + return [ + rollout_output_from_trajectory( + trajectory, + example_id=first_example_id + index, + prompt_length=prompt_length, + include_trajectory=include_trajectory, + ) + for index, trajectory in enumerate(group.trajectories) + ] + + +def normalize_verifiers_rollout_output( + output: Mapping[str, Any], + *, + prompt_length: int | None = None, + include_trajectory: bool = True, +) -> dict[str, Any]: + """Round-trip a verifiers output through ART and back to verifiers shape. + + This is useful for portability checks and for tools that need a normalized + RolloutOutput-compatible payload after ART has inspected or transformed the + trajectory. + """ + + trajectory = trajectory_from_verifiers_rollout(output) + example_id_value = output.get("example_id") + example_id = example_id_value if isinstance(example_id_value, int) else 0 + normalized = rollout_output_from_trajectory( + trajectory, + example_id=example_id, + prompt_length=prompt_length, + include_trajectory=include_trajectory, + ) + if "timing" in output: + normalized["timing"] = deepcopy(output["timing"]) + if "logs" in output: + normalized["logs"] = _string_list(output.get("logs")) + if "answer" in output: + normalized["answer"] = output.get("answer") + if "stop_condition" in output: + normalized["stop_condition"] = output.get("stop_condition") + normalized["is_completed"] = bool(output.get("is_completed", True)) + normalized["is_truncated"] = bool(output.get("is_truncated", normalized["is_truncated"])) + return normalized + + +def normalize_verifiers_rollout_outputs( + outputs: Iterable[Mapping[str, Any]], + *, + prompt_length: int | None = None, + include_trajectory: bool = True, +) -> list[dict[str, Any]]: + """Normalize a collection of verifiers outputs through ART trajectories.""" + + return [ + normalize_verifiers_rollout_output( + output, + prompt_length=prompt_length, + include_trajectory=include_trajectory, + ) + for output in outputs + ] + + +async def rollout_with_verifiers_environment( + env: Any, + model: Any, + input: Mapping[str, Any], + *, + sampling_args: Mapping[str, Any] | None = None, + max_retries: int = 0, + state_columns: Sequence[str] = ("trajectory",), +) -> Trajectory: + """Run a verifiers Environment with an ART model and return an ART Trajectory.""" + + output = await _run_verifiers_rollout( + env, + model, + input, + sampling_args=sampling_args, + max_retries=max_retries, + state_columns=state_columns, + ) + return trajectory_from_verifiers_rollout(output) + + +async def trajectory_group_with_verifiers_environment( + env: Any, + model: Any, + group_inputs: Sequence[Mapping[str, Any]], + *, + sampling_args: Mapping[str, Any] | None = None, + max_retries: int = 0, + state_columns: Sequence[str] = ("trajectory",), +) -> TrajectoryGroup: + """Run a verifiers Environment group with an ART model.""" + + try: + from verifiers.clients.openai_chat_completions_client import ( + OpenAIChatCompletionsClient, + ) + except ImportError as exc: + raise ImportError( + "art.verifiers requires the optional `verifiers` package. " + "Install it with `pip install verifiers`." + ) from exc + + client = OpenAIChatCompletionsClient(model.openai_client()) + outputs = await env.run_group( + group_inputs=list(group_inputs), + client=client, + model=model.get_inference_name(), + sampling_args=dict(sampling_args or {"n": 1}), + max_retries=max_retries, + state_columns=list(state_columns), + ) + return trajectory_group_from_verifiers_outputs(outputs) + + +async def _run_verifiers_rollout( + env: Any, + model: Any, + input: Mapping[str, Any], + *, + sampling_args: Mapping[str, Any] | None, + max_retries: int, + state_columns: Sequence[str], +) -> Mapping[str, Any]: + try: + from verifiers.clients.openai_chat_completions_client import ( + OpenAIChatCompletionsClient, + ) + except ImportError as exc: + raise ImportError( + "art.verifiers requires the optional `verifiers` package. " + "Install it with `pip install verifiers`." + ) from exc + + client = OpenAIChatCompletionsClient(model.openai_client()) + return await env.run_rollout( + input=dict(input), + client=client, + model=model.get_inference_name(), + sampling_args=dict(sampling_args or {"n": 1}), + max_retries=max_retries, + state_columns=list(state_columns), + ) + + +def _messages_from_verifiers_output(output: Mapping[str, Any]) -> Messages: + trajectory = output.get("trajectory") + if isinstance(trajectory, Sequence) and not isinstance(trajectory, (str, bytes)): + messages = _messages_from_verifiers_steps(trajectory) + if messages: + return messages + + prompt = _coerce_messages(output.get("prompt")) + completion = _coerce_messages(output.get("completion")) + return [*prompt, *completion] + + +def _messages_from_verifiers_steps(steps: Sequence[Any]) -> Messages: + messages: list[dict[str, Any]] = [] + for raw_step in steps: + if not isinstance(raw_step, Mapping): + continue + prompt = _coerce_messages(raw_step.get("prompt")) + completion = _coerce_messages(raw_step.get("completion")) + _append_with_prefix_dedupe(messages, prompt) + messages.extend(completion) + return cast(Messages, messages) + + +def _append_with_prefix_dedupe( + messages: list[dict[str, Any]], incoming: list[dict[str, Any]] +) -> None: + if not incoming: + return + if not messages: + messages.extend(incoming) + return + if incoming[: len(messages)] == messages: + messages.extend(incoming[len(messages) :]) + return + messages.extend(incoming) + + +def _coerce_messages(value: Any) -> list[dict[str, Any]]: + if value is None: + return [] + if isinstance(value, str): + return [{"role": "user", "content": value}] + if not isinstance(value, Sequence) or isinstance(value, (bytes, bytearray)): + raise TypeError(f"Expected a message list or string, got {type(value).__name__}") + + messages: list[dict[str, Any]] = [] + for message in value: + messages.append(_message_to_dict(message)) + return messages + + +def _messages_from_art_items(items: Iterable[Any]) -> list[dict[str, Any]]: + return [_message_to_dict(item) for item in items] + + +def _message_to_dict(message: Any) -> dict[str, Any]: + if isinstance(message, Choice): + data = message.message.model_dump(mode="json", exclude_none=True) + data["role"] = "assistant" + return data + if hasattr(message, "model_dump"): + return cast(dict[str, Any], message.model_dump(mode="json", exclude_none=True)) + if isinstance(message, Mapping): + return deepcopy(dict(message)) + raise TypeError(f"Unsupported message type: {type(message).__name__}") + + +def _prompt_length(messages: Sequence[Mapping[str, Any]], prompt_length: int | None) -> int: + if prompt_length is not None: + if prompt_length < 0 or prompt_length > len(messages): + raise ValueError("prompt_length must be between 0 and the number of messages") + return prompt_length + for index, message in enumerate(messages): + if message.get("role") == "assistant": + return index + return len(messages) + + +def _numeric_metrics(metrics: Mapping[str, Any]) -> dict[str, float | int | bool]: + numeric: dict[str, float | int | bool] = {} + for key, value in metrics.items(): + if isinstance(value, (float, int, bool)): + numeric[key] = value + return numeric + + +def _verifiers_metadata(output: Mapping[str, Any]) -> dict[str, float | int | str | bool | None]: + metadata: dict[str, float | int | str | bool | None] = {} + fields = ( + "example_id", + "is_completed", + "is_truncated", + "stop_condition", + "answer", + ) + for field in fields: + value = output.get(field) + if isinstance(value, (float, int, str, bool)) or value is None: + metadata[f"verifiers_{field}"] = value + return metadata + + +def _string_list(value: Any) -> list[str]: + if not isinstance(value, Sequence) or isinstance(value, (str, bytes, bytearray)): + return [] + return [item for item in value if isinstance(item, str)] + + +def _openai_tools_from_verifiers_tools(value: Any) -> Tools | None: + if value is None: + return None + if not isinstance(value, Sequence) or isinstance(value, (str, bytes, bytearray)): + return None + tools: list[dict[str, Any]] = [] + for tool in value: + tool_dict = _model_or_mapping_to_dict(tool) + if tool_dict.get("type") == "function": + tools.append(tool_dict) + continue + name = tool_dict.get("name") + parameters = tool_dict.get("parameters") + if isinstance(name, str) and isinstance(parameters, Mapping): + tools.append( + { + "type": "function", + "function": { + "name": name, + "description": str(tool_dict.get("description", "")), + "parameters": dict(parameters), + }, + } + ) + return cast(Tools, tools) or None + + +def _verifiers_tools_from_openai_tools(value: Tools | None) -> list[dict[str, Any]]: + if value is None: + return [] + tools: list[dict[str, Any]] = [] + for tool in value: + tool_dict = _model_or_mapping_to_dict(tool) + function = tool_dict.get("function") + if tool_dict.get("type") == "function" and isinstance(function, Mapping): + tools.append( + { + "name": function.get("name", ""), + "description": function.get("description", ""), + "parameters": function.get("parameters", {}), + } + ) + continue + if {"name", "parameters"} <= set(tool_dict): + tools.append(tool_dict) + return tools + + +def _model_or_mapping_to_dict(value: Any) -> dict[str, Any]: + if hasattr(value, "model_dump"): + return cast(dict[str, Any], value.model_dump(mode="json", exclude_none=True)) + if isinstance(value, Mapping): + return deepcopy(dict(value)) + return {}