Source code for evoproc.ga_scaffold_structured

# src/ga_scaffold_structured.py
"""
GA scaffold for LLM‑generated *global‑state* procedures.

This module wires up a lightweight genetic algorithm (GA) over your Procedure
JSON schema. It uses LLM‑driven **crossover** and **mutation** operators, a
validator‑driven structural hygiene scorer, optional task‑evaluation scoring,
and diversity via random immigrants.

Global‑state semantics
----------------------
- Step 1 must take exactly one input: ["problem_text"].
- Later steps may read any variable produced by earlier steps (no strict pass‑through).
- The final step must output exactly ["final_answer"] (a *description*, not a computed value).

Typical usage
-------------
.. code-block:: python

    from evoproc.ga_scaffold_structured import *
    from evoproc.scorers import (
        StructuralHygieneScorer,
        ProcScorerAdapter,
        TaskEvalScorer,
    )
    from evoproc.validators import validate_procedure_structured


    ga = ProcedureGA(
        model="gemma3:latest",
        create_proc_fn=create_procedure_prompt,
        query_fn=query,
        schema_json_fn=lambda: Procedure.model_json_schema(),
        validate_fn=validate_procedure_structured,
        repair_fn=query_repair_structured,
        scorer=ProcScorerAdapter(
        StructuralHygieneScorer(validate_fn=validate_procedure_structured)
        ),
        cfg=GAConfig(population_size=8, max_generations=5, seed=42),
    )


    best, history = ga.run(
        task_description="Solve: Natalia sold clips to 48 friends in April...",
        # Supply these three for TaskEval scoring; otherwise structural scoring is used:
        final_answer_schema=None,
        eval_fn=None, # (state, proc) -> float
        run_steps_fn=None, # executes a procedure end-to-end and returns `state`
        print_progress=True,
    )
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Protocol
import copy
import json
import random

from evoproc.scorers import StructuralHygieneScorer, ProcScorerAdapter  # default structural scorer

JSONDict = Dict[str, Any]


# ======================
# Config / Individuals
# ======================

[docs] @dataclass class GAConfig: """Genetic algorithm configuration. Attributes: population_size: Number of individuals per generation. elitism: Number of top individuals copied unchanged to the next generation. crossover_rate: Probability of producing a child via crossover in reproduction. mutation_rate: Probability of producing a child via mutation (fallback is also mutation). max_generations: Total number of evolutionary iterations. tournament_k: Tournament size for parent selection. seed: Random seed for reproducibility (also forwarded to LLM ops where applicable). random_immigrant_rate: Fraction of remaining slots each generation filled with freshly generated procedures (diversity injection). """ population_size: int = 5 elitism: int = 2 crossover_rate: float = 0.7 mutation_rate: float = 0.3 max_generations: int = 10 tournament_k: int = 3 seed: Optional[int] = None random_immigrant_rate: float = 0.10
[docs] @dataclass class Individual: """Population member wrapper. :ivar dict proc: Procedure JSON that validates your Pydantic‑derived schema. :ivar Optional[float] fitness: Last computed scalar fitness (``None`` until evaluated). :ivar str notes: Optional debugging/instrumentation notes. """ # """ # Population member wrapper. # Attributes # ---------- # proc # Procedure JSON (dict) that validates your Pydantic-derived schema. # fitness # Last computed scalar fitness (None until evaluated). # notes # Optional debugging/instrumentation notes. # """ proc: JSONDict fitness: Optional[float] = None notes: str = ""
# ====================== # GA-facing Scorer Protocol # ======================
[docs] class Scorer(Protocol): """GA-facing scorer protocol. Implementations must accept an object with a `.proc` JSON field and return a scalar fitness. """
[docs] def score(self, ind: Individual, **kwargs: Any) -> float: ...
# ====================== # Utilities # ====================== def _deepcopy(p: JSONDict) -> JSONDict: """Return a deep copy of a procedure JSON using JSON round-tripping. Args: p: Procedure JSON. Returns: A deep copy of ``p``. """ return json.loads(json.dumps(p)) def _renumber_steps(p: JSONDict) -> JSONDict: """Renumber step IDs to be contiguous ``1..n`` in list order. Args: p: Procedure JSON. Returns: A new procedure JSON with normalized step IDs. """ q = _deepcopy(p) for i, s in enumerate(q.get("steps", []), start=1): s["id"] = i return q # ====================== # Operators # ======================
[docs] class CrossoverOperator: """LLM‑based crossover for *global‑state* procedures. Combines two parent procedures (A, B) into a single coherent child by prompting the LLM to synthesize an integrated plan that: - Preserves the Step 1 rule (inputs == ["problem_text"]). - Adheres to global‑state semantics (later steps can read any earlier variables). - Ends with exactly one output ``"final_answer"``. - Validates against the provided schema. Notes: This operator does **not** splice JSON directly; it asks the LLM to synthesize a crossover child, which tends to yield more coherent procedures than mechanical concatenation. """
[docs] def __init__( self, model: str, query_fn: Callable[[str, str, Optional[Dict[str, Any]], Optional[int]], str], schema_json_fn: Callable[[], Dict[str, Any]], validate_fn: Callable[[JSONDict], List[Dict[str, Any]]], repair_fn: Callable[[JSONDict, str], JSONDict], seed: int = 1234, ): """Initialize the crossover operator. Args: model: LLM name. query_fn: Callable ``query(prompt, model, fmt, seed) -> str`` returning JSON text. schema_json_fn: Callable returning the Procedure JSON schema (dict). validate_fn: Returns a list of validator diagnostics for a procedure JSON. repair_fn: Minimally repairs a procedure JSON using the LLM. seed: Random seed forwarded to ``query_fn``. """ self.model = model self.query_fn = query_fn self.schema_json_fn = schema_json_fn self.validate_fn = validate_fn self.repair_fn = repair_fn self.seed = seed
def _build_prompt( self, task_description: str, parent_a_json: str, parent_b_json: str, extra_constraints: Optional[str] = None, style_hint: Optional[str] = None, ) -> str: """Construct the crossover prompt with hard constraints and both parents. Returns: A single prompt string instructing the LLM to emit **one** JSON object that validates the schema and respects global‑state constraints. """ schema_json = json.dumps(self.schema_json_fn(), ensure_ascii=False) constraints = extra_constraints or """ REQUIREMENTS (hard): - Output exactly ONE JSON object that validates against the schema. - GLOBAL STATE: Each step may READ any variable previously produced by earlier steps. - Declared inputs must be resolvable from variables produced earlier (by name). - Outputs must be unique across the procedure (no duplicate names). - Variable names are snake_case and stable across steps. - Step 1 must include only 'problem_text' as its input. - Final step outputs exactly 'final_answer' (description only; do not compute). - Keep steps single-action; avoid redundant variables; remove unreachable steps. - Prefer early extraction of primitive facts. """ style = style_hint or "Prefer A's strong extraction and B's clean reasoning; reconcile variable names." return f"""You are a rigorous planner that ONLY outputs a JSON object that validates the schema. # TASK Synthesize a SINGLE crossover child procedure for the task below using GLOBAL STATE. ## Task Description {task_description} ## Procedure JSON Schema (verbatim) {schema_json} ## Parent A (JSON) ```json {parent_a_json}``` ## Parent B (JSON) ```json {parent_b_json}``` ## Crossover Objective - Reuse the best sub-steps, remove duplicates, align variable names. - {style} {constraints} Return the JSON object only. No markdown or commentary. """ def __call__( self, task_description: str, parent_a: Dict[str, Any], parent_b: Dict[str, Any], n_offspring: int = 1, ) -> Dict[str, Any]: """Perform crossover on two parents and return one child (or the best of `n_offspring`). Args: task_description: The natural‑language problem the procedure will solve. parent_a: First input procedure (JSON dict). parent_b: Second input procedure (JSON dict). n_offspring: If > 1, generate multiple children and return the one with the fewest validator penalties. Returns: Child procedure JSON. """ schema = self.schema_json_fn() pa = json.dumps(parent_a, ensure_ascii=False) pb = json.dumps(parent_b, ensure_ascii=False) children: List[Dict[str, Any]] = [] for _ in range(max(1, n_offspring)): prompt = self._build_prompt(task_description, pa, pb) raw = self.query_fn(prompt, self.model, fmt=schema, seed=self.seed) child = json.loads(raw) if isinstance(raw, str) else raw try: child = self.repair_fn(child, self.model) except Exception: pass children.append(child) if len(children) == 1: return children[0] def penalty(proc: Dict[str, Any]) -> tuple[int, int]: diags = self.validate_fn(proc) fatal = sum(1 for d in diags if d.get("severity") == "fatal") repair = sum(1 for d in diags if d.get("severity") == "repairable") return (fatal, repair) return min(children, key=penalty)
[docs] class MutationOperator: """LLM-driven mutation for *global-state* procedures. Applies exactly **one** small edit per call (rewrite / split / insert / remove / rename / verify), returning a full, schema‑valid procedure JSON. Post‑processes with ``repair_fn`` and rejects candidates with fatal validator diagnostics. If a procedure‑level scorer is supplied, only not‑worse mutations are accepted. """
[docs] def __init__( self, model: str, query_fn: Callable[[str, str, Optional[Dict[str, Any]], Optional[int]], str], schema_json_fn: Callable[[], Dict[str, Any]], validate_fn: Callable[[JSONDict], List[Dict[str, Any]]], repair_fn: Callable[[JSONDict, str], JSONDict], proc_scorer: Optional[Any], rng: Optional[random.Random], seed: int, *, accept_if_not_worse: bool = True, max_llm_tries: int = 2, ) -> None: """Initialize the mutation operator. Args: model: LLM name to use for mutation prompts. query_fn: Callable ``query(prompt, model, fmt, seed) -> str`` returning JSON text. schema_json_fn: Callable returning the Procedure JSON schema (dict). validate_fn: Returns a list of diagnostics for a procedure JSON. repair_fn: Minimally repairs a procedure JSON using the LLM. proc_scorer: Optional object exposing ``score_proc(proc_json) -> float``. rng: Optional PRNG for sampling mutation intents. seed: Forwarded to ``query_fn`` for deterministic results. accept_if_not_worse: If True, reject candidates that score worse than the original. max_llm_tries: Number of mutation attempts before falling back to the original. """ self.model = model self.query_fn = query_fn self.schema_json_fn = schema_json_fn self.validate_fn = validate_fn self.repair_fn = repair_fn self.proc_scorer = proc_scorer self.accept_if_not_worse = accept_if_not_worse self.rng = rng or random.Random() self.seed = seed self.max_llm_tries = max_llm_tries
def __call__(self, proc: JSONDict, task_description: str) -> JSONDict: """Mutate a single procedure. Args: proc: The original procedure JSON. task_description: Natural‑language task the procedure addresses. Returns: Mutated (and repaired/validated) procedure JSON. If mutation fails validation or acceptance, returns the original. """ orig = _deepcopy(proc) target_score = self._score(orig) if self.proc_scorer else None schema = self.schema_json_fn() proc_json = json.dumps(orig, ensure_ascii=False) intent = self._sample_intent() prompt = self._build_prompt(task_description, proc_json, schema, intent=intent) candidate = None for _ in range(max(1, self.max_llm_tries)): try: raw = self.query_fn(prompt, self.model, fmt=schema, seed=self.seed) cand = json.loads(raw) if isinstance(raw, str) else raw except Exception: continue try: cand = self.repair_fn(cand, self.model) except Exception: pass cand = _renumber_steps(cand) diags = self.validate_fn(cand) if any(d.get("severity") == "fatal" for d in diags): continue candidate = cand break if candidate is None: return orig if self.proc_scorer and self.accept_if_not_worse: new_score = self._score(candidate) if new_score < target_score: return orig return candidate # ---- helpers ---- def _build_prompt(self, task: str, proc_json: str, schema: Dict[str, Any], intent: str) -> str: """Build an LLM prompt to apply exactly one small mutation under hard constraints.""" schema_json = json.dumps(schema, ensure_ascii=False) return f""" You will perform a SINGLE, SMALL mutation to the Procedure for the task below. Return ONLY ONE JSON object that validates against the schema. # Task {task} # Procedure JSON Schema (verbatim) {schema_json} # Current Procedure (JSON) ```json {proc_json}``` # Mutation Goal - Apply exactly ONE mutation that improves clarity, correctness likelihood, or structural hygiene. - Mutation intent (hint): {intent} # Hard Constraints (global-state semantics) - Step 1 inputs == ["problem_text"]. - Later steps may read any variable produced by earlier steps (global state). - Final step outputs exactly ["final_answer"] (description only; do not compute numeric value). - Variable names must be snake_case; avoid redefining an existing variable name. - Remove dead outputs if they become unused; keep each step single-action, imperative. - Prefer early extraction: move primitive fact extraction earlier if applicable. # Output Return the FULL mutated procedure as a SINGLE JSON object valid under the schema. Do NOT include markdown, fences, or commentary. """.strip() def _sample_intent(self) -> str: """Sample a lightweight mutation intent to diversify edits without hard-coding types.""" intents = [ "rewrite one step to be more concrete/single-action", "split one too-broad step into two small steps", "insert a missing extraction step for a needed variable", "remove one unused output or trivial no-op step", "rename an inconsistent variable to a consistent snake_case name", "consolidate two adjacent trivial steps without losing information", "add one verification/check step to ensure extracted facts are consistent", ] return self.rng.choice(intents) def _score(self, p: JSONDict) -> float: """Score a procedure with the provided `proc_scorer` if available.""" try: return float(self.proc_scorer.score_proc(p)) # type: ignore[attr-defined] except Exception: return float("-inf")
# ====================== # GA Core # ======================
[docs] class ProcedureGA: """GA driver that orchestrates initialize → evaluate → select → reproduce → next gen. You provide your model + callable hooks (``query_fn``, ``create_proc_fn``, validators, repair, and optionally a task‑eval runner). By default, the GA uses a structural hygiene scorer; you can swap in task‑eval scoring by supplying ``final_answer_schema``, ``eval_fn``, and ``run_steps_fn`` to :meth:`run`. """
[docs] def __init__( self, model: str, create_proc_fn: Callable[[str], str], query_fn: Callable[[str, str, Optional[Dict[str, Any]], Optional[int]], str], schema_json_fn: Callable[[], Dict[str, Any]], validate_fn: Callable[[JSONDict], List[Any]], repair_fn: Callable[[JSONDict, str], JSONDict], scorer: Optional[Scorer] = None, cfg: GAConfig = GAConfig(), rng: Optional[random.Random] = None, ) -> None: """Initialize the GA with model/context functions and configuration.""" self.model = model self.create_proc_fn = create_proc_fn self.query_fn = query_fn self.schema_json_fn = schema_json_fn self.validate_fn = validate_fn self.repair_fn = repair_fn self.cfg = cfg self.rng = rng or random.Random(cfg.seed) # Default: structural hygiene scorer (adapter because GA calls .score(ind)) self.scorer: Scorer = scorer or ProcScorerAdapter( StructuralHygieneScorer(validate_fn=self.validate_fn) ) # Operators (LLM-based) self.crossover = CrossoverOperator( model=self.model, query_fn=self.query_fn, schema_json_fn=self.schema_json_fn, validate_fn=self.validate_fn, repair_fn=self.repair_fn, seed=(self.cfg.seed or 1234), ) self.mutate = MutationOperator( model=self.model, query_fn=self.query_fn, schema_json_fn=self.schema_json_fn, validate_fn=self.validate_fn, repair_fn=self.repair_fn, proc_scorer=getattr(self.scorer, "_proc_scorer", None), # pass underlying proc scorer if using adapter rng=self.rng, seed=(self.cfg.seed or 1234), )
# ---- Initialization ---- def _generate_one(self, task_description: str) -> JSONDict: """Create a single, valid procedure by prompting, repairing, and normalizing. Steps: 1. Prompt the LLM with ``create_proc_fn(task)``. 2. Parse JSON (best‑effort fallback extraction). 3. Run a repair pass. 4. Renumber step IDs. Returns: A schema‑conforming procedure JSON (best‑effort). """ prompt = self.create_proc_fn(task_description) raw = self.query_fn(prompt, self.model, fmt=self.schema_json_fn(), seed=1234) try: proc = json.loads(raw) if isinstance(raw, str) else raw except Exception: # fallback: extract best-effort JSON substring l = raw.find("{"); r = raw.rfind("}") if l != -1 and r != -1 and r > l: proc = json.loads(raw[l:r+1]) else: raise try: proc = self.repair_fn(proc, self.model) except Exception: pass return _renumber_steps(proc)
[docs] def initialize_population(self, task_description: str) -> List[Individual]: """Generate the initial population by repeatedly calling ``_generate_one``. Returns: A list of :class:`Individual` with ``proc`` populated. """ return [Individual(self._generate_one(task_description)) for _ in range(self.cfg.population_size)]
# ---- Evaluation ----
[docs] def evaluate(self, pop: List[Individual], scorer: Optional[Scorer] = None, **kwargs: Any) -> None: """Compute fitness for every individual in‑place using a scorer. Args: pop: Population to evaluate. scorer: Optional override; must implement ``score(individual) -> float``. """ scorer = scorer or self.scorer for ind in pop: try: ind.fitness = scorer.score(ind, **kwargs) except Exception: ind.fitness = -1e9
# ---- Selection ---- def _tournament(self, pop: List[Individual]) -> Individual: """Tournament selection: pick ``k`` random individuals and return the best. Returns: Selected parent candidate. """ k = min(self.cfg.tournament_k, len(pop)) group = self.rng.sample(pop, k=k) return max(group, key=lambda i: i.fitness if i.fitness is not None else -1e9) def _select_parents(self, pop: List[Individual]) -> Tuple[Individual, Individual]: """Select two parents independently via tournament selection.""" return self._tournament(pop), self._tournament(pop) # ---- Reproduction ---- def _reproduce(self, task_description: str, p1: Individual, p2: Individual) -> JSONDict: """Create a child from two parents using crossover or mutation: - With probability ``crossover_rate``: LLM crossover on ``(p1, p2)``. - Else: LLM mutation on one randomly chosen parent. Returns: Child procedure JSON, repaired and renumbered. """ r = self.rng.random() if r < self.cfg.crossover_rate: child = self.crossover(task_description, p1.proc, p2.proc) else: base = p1 if self.rng.random() < 0.5 else p2 child = self.mutate(base.proc, task_description) try: child = self.repair_fn(child, self.model) except Exception: pass return _renumber_steps(child) # ---- Run ----
[docs] def run( self, task_description: str, final_answer_schema: Optional[Dict[str, Any]] = None, eval_fn: Optional[Callable[[Dict[str, Any], Dict[str, Any]], float]] = None, run_steps_fn: Optional[Callable[..., Dict[str, Any]]] = None, print_progress: bool = False, ) -> Tuple[Individual, List[Individual]]: """Execute the full GA loop and return the best individual plus history of elites. If ``final_answer_schema``, ``eval_fn``, and ``run_steps_fn`` are all provided, the GA uses task‑eval scoring for that generation; otherwise it uses the structural hygiene scorer. Args: task_description: Natural‑language problem the procedures should solve. final_answer_schema: JSON schema for the final step (required for TaskEval scoring). eval_fn: Callable ``(state, proc) -> float`` that grades an executed procedure. run_steps_fn: Callable that executes a procedure and returns the final ``state`` dict. print_progress: If True, prints generation‑level fitness summaries. Returns: A tuple of ``(best_individual, elites_history)``. """ pop = self.initialize_population(task_description) history: List[Individual] = [] for gen in range(self.cfg.max_generations): # Choose scorer (task-eval if fully provided; else structural) if eval_fn and run_steps_fn and final_answer_schema is not None: from evoproc.scorers import TaskEvalScorer scorer: Scorer = TaskEvalScorer( run_steps_fn=run_steps_fn, eval_fn=eval_fn, question=task_description, final_answer_schema=final_answer_schema, model=self.model, strict_require_key="final_answer", ) else: scorer = self.scorer self.evaluate(pop, scorer) pop.sort(key=lambda i: i.fitness if i.fitness is not None else -1e9, reverse=True) best = copy.deepcopy(pop[0]) history.append(best) if print_progress: print(f"[gen {gen+1}] best={best.fitness:.3f} steps={len(best.proc.get('steps', []))}") # Next generation scaffold next_pop: List[Individual] = [copy.deepcopy(e) for e in pop[: self.cfg.elitism]] # Diversity: random immigrants n_slots = self.cfg.population_size - len(next_pop) n_imm = int(max(0, n_slots) * self.cfg.random_immigrant_rate) for _ in range(n_imm): next_pop.append(Individual(self._generate_one(task_description))) # Fill remaining slots via reproduction while len(next_pop) < self.cfg.population_size: p1, p2 = self._select_parents(pop) child = self._reproduce(task_description, p1, p2) next_pop.append(Individual(proc=child)) pop = next_pop # Final evaluate & return best self.evaluate(pop, scorer) pop.sort(key=lambda i: i.fitness if i.fitness is not None else -1e9, reverse=True) return pop[0], history
if __name__ == "__main__": print("GA scaffold ready. Import into your environment that defines Procedure, query, etc.")