Source code for evoproc_procedures.runners

# projects/procedures/src/evoproc_procedures/runners.py
"""Step executors for global-state procedures.

This module executes a Procedure (as JSON/dict) step-by-step by:
  1) Assembling the *visible inputs* for each step from the global state,
  2) Building an execution prompt with :func:`evo_proc_procedures.prompts.create_execution_prompt`,
  3) Calling a provided `query_fn` (LLM backend) with a JSON Schema for the step's outputs,
  4) Parsing and merging returned outputs back into the global state.

Design
------
- Backend-agnostic: pass any `query_fn(prompt, model, fmt, seed) -> str`.
- Strict by default: raises if a required output is missing.
- Final step uses your `answer_schema` (e.g., GSM/ARC) instead of a generic schema.

Typical usage
-------------
>>> from evoproc_procedures.runners import run_steps_stateful_minimal
>>> from evoproc_procedures.query_backends.ollama import query
>>> state = run_steps_stateful_minimal(proc, "2+3=?", gsm_schema, "gemma3:latest", query_fn=query)
>>> state["final_answer"]
"""

from __future__ import annotations

import json
from typing import Any, Dict, Iterable, List, Mapping, Optional, Callable

from evoproc_procedures.prompts import create_execution_prompt

JSON = Dict[str, Any]
Fields = List[Mapping[str, Any]]
QueryFn = Callable[[str, str, Optional[Dict[str, Any]], Optional[int]], str]


# ----------------------- small field helpers (no src.* deps) -----------------------

def _field_names(fields: Fields) -> List[str]:
    """Return the list of ``name`` values from a list of input/output field dicts."""
    return [str(f.get("name")) for f in (fields or []) if "name" in f]

def _field_descriptions(fields: Fields) -> Dict[str, str]:
    """Return a mapping of field name → description (missing desc → '')."""
    return {str(f.get("name")): str(f.get("description", "")) for f in (fields or []) if "name" in f}


# ------------------------------- schema helper ------------------------------------

[docs] def create_output_schema(step: Mapping[str, Any]) -> JSON: """Create a permissive JSON Schema for this step's declared outputs. The schema enforces *keys and presence* (``required``), but allows the value type to be number/string/boolean. Use a stricter schema if your step needs it. Parameters ---------- step A step object with an ``"output"`` list of field dicts (each dict must include ``{"name": ..., "description": ...}``). Returns ------- dict JSON Schema enforcing required output keys and simple scalar types. """ required_keys = _field_names(step.get("output", [])) valid_types = {"oneOf": [{"type": "number"}, {"type": "string"}, {"type": "boolean"}]} return { "type": "object", "properties": {name: valid_types for name in required_keys}, "required": required_keys, "additionalProperties": False, }
# ----------------------------------- runner ---------------------------------------
[docs] def run_steps_stateful_minimal( proc: Mapping[str, Any], problem_text: str, answer_schema: JSON, model: str, *, query_fn: Optional[QueryFn] = None, seed: Optional[int] = 1234, print_bool: bool = False, strict_missing: bool = True, ) -> JSON: """Execute a global-state procedure and return the final state. For each step, this: • Collects only the needed inputs from the current global state, • Builds an execution prompt and a strict JSON Schema for outputs, • Calls `query_fn` to obtain a JSON object, • Merges declared outputs back into the global state. The final step uses ``answer_schema`` (e.g., GSM/ARC) so you can grade results. Parameters ---------- proc Procedure JSON with keys: ``"steps"`` (list), and step fields including ``"id"``, ``"inputs"`` (list of fields), ``"stepDescription"``, and ``"output"``. problem_text The original task text; becomes ``state["problem_text"]`` and must be the only input of Step 1 per global-state rules. answer_schema JSON Schema dict for the final step's output object. model Backend model name passed to `query_fn`. query_fn Callable with signature ``(prompt, model, fmt, seed) -> str``. If omitted, we lazily import Ollama's default `query` (requires evoproc_procedures[llm]). seed Optional random seed for the backend (if supported). print_bool If True, prints visible inputs and outputs for each step (debugging). strict_missing If True, raises when the model omits a required output key; if False, leaves it unset. Returns ------- dict The final global state containing all produced variables (including ``final_answer``). Raises ------ RuntimeError If a step input cannot be resolved from the global state, or a required output is missing and ``strict_missing=True``. ValueError If the backend response is not valid JSON. """ # Lazy default backend to avoid hard dependency here. if query_fn is None: try: from evoproc_procedures.query_backends.ollama import query as _default_query # type: ignore except Exception as e: raise ImportError( "No `query_fn` provided and Ollama backend is unavailable. " "Install the LLM extra (`pip install -e projects/evoproc_procedures[llm]`) " "or pass a custom `query_fn`." ) from e query_fn = _default_query state: JSON = {"problem_text": problem_text} steps: Iterable[Mapping[str, Any]] = proc.get("steps", []) for step in steps: need = _field_names(step.get("inputs", [])) # Assemble *only* the inputs the step declared visible_inputs: JSON = {} for name in need: if name == "problem_text": visible_inputs[name] = problem_text elif name in state: visible_inputs[name] = state[name] else: raise RuntimeError( f"Unresolvable input '{name}' for step id={step.get('id')}: " "no prior producer in state." ) is_last = (step.get("id") == len(proc.get("steps", []))) if is_last: schema = answer_schema expected = list(answer_schema.get("properties", {}).keys()) output_desc = {k: answer_schema["properties"][k].get("description", "") for k in expected} else: expected = _field_names(step.get("output", [])) output_desc = _field_descriptions(step.get("output", [])) schema = create_output_schema(step) action = step.get("stepDescription") or step.get("step_description") or "" step_prompt = create_execution_prompt( visible_inputs, action, schema, expected, output_desc, is_final=is_last ) # Backend call raw = query_fn(step_prompt, model, schema, seed) try: out = json.loads(raw) if isinstance(raw, str) else raw # type: ignore[assignment] except Exception as e: raise ValueError(f"Non-JSON response for step id={step.get('id')}: {e}") from e # Merge declared outputs only for name in expected: if name in out: state[name] = out[name] elif strict_missing: raise RuntimeError( f"Model omitted required output '{name}' for step id={step.get('id')}" ) if print_bool: print(f"[step {step.get('id')}] inputs: {visible_inputs}") print(f"[step {step.get('id')}] outputs: {{k: state[k] for k in {expected} if k in state}}") return state