Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions openevolve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class LLMModelConfig:
# Reasoning parameters
reasoning_effort: Optional[str] = None

# Manual mode (human-in-the-loop)
manual_mode: Optional[bool] = None
_manual_queue_dir: Optional[str] = None

def __post_init__(self):
"""Post-initialization to resolve ${VAR} env var references in api_key"""
self.api_key = _resolve_env_var(self.api_key)
Expand Down Expand Up @@ -117,6 +121,9 @@ class LLMConfig(LLMModelConfig):
# Reasoning parameters (inherited from LLMModelConfig but can be overridden)
reasoning_effort: Optional[str] = None

# Manual mode switch
manual_mode: bool = False

def __post_init__(self):
"""Post-initialization to set up model configurations"""
super().__post_init__() # Resolve ${VAR} in api_key at LLMConfig level
Expand Down Expand Up @@ -171,6 +178,7 @@ def __post_init__(self):
"retry_delay": self.retry_delay,
"random_seed": self.random_seed,
"reasoning_effort": self.reasoning_effort,
"manual_mode": self.manual_mode,
}
self.update_model_params(shared_config)

Expand Down
33 changes: 33 additions & 0 deletions openevolve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import logging
import os
import shutil
import signal
import time
import uuid
Expand Down Expand Up @@ -85,6 +86,9 @@ def __init__(
# Set up logging
self._setup_logging()

# Manual mode queue lives in <openevolve_output>/manual_tasks_queue
self._setup_manual_mode_queue()

# Set random seed for reproducibility if specified
if self.config.random_seed is not None:
import hashlib
Expand Down Expand Up @@ -208,6 +212,35 @@ def _setup_logging(self) -> None:

logger.info(f"Logging to {log_file}")

def _setup_manual_mode_queue(self) -> None:
"""
Set up manual task queue directory if llm.manual_mode is enabled

Queue directory is always:
<openevolve_output>/manual_tasks_queue

The directory is cleared on controller start so the UI shows only tasks
from the current run (no stale tasks after restart)
"""
if not bool(getattr(self.config.llm, "manual_mode", False)):
return

qdir = (Path(self.output_dir).expanduser().resolve() / "manual_tasks_queue")

# Clear stale tasks from previous runs
if qdir.exists():
shutil.rmtree(qdir)
qdir.mkdir(parents=True, exist_ok=True)

# Inject runtime-only queue dir into configs
self.config.llm._manual_queue_dir = str(qdir)
for model_cfg in self.config.llm.models:
model_cfg._manual_queue_dir = str(qdir)
for model_cfg in self.config.llm.evaluator_models:
model_cfg._manual_queue_dir = str(qdir)

logger.info(f"Manual mode enabled. Queue dir: {qdir}")

def _load_initial_program(self) -> str:
"""Load the initial program from file"""
with open(self.initial_program_path, "r") as f:
Expand Down
136 changes: 125 additions & 11 deletions openevolve/llm/openai.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,49 @@
"""
OpenAI API interface for LLMs

This module also supports a "manual mode" (human-in-the-loop) where prompts are written
to a task queue directory and the system waits for a corresponding *.answer.json file
"""

import asyncio
import json
import logging
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import openai

from openevolve.config import LLMConfig
from openevolve.llm.base import LLMInterface

logger = logging.getLogger(__name__)


def _iso_now() -> str:
return datetime.now(tz=timezone.utc).isoformat()


def _build_display_prompt(messages: List[Dict[str, str]]) -> str:
"""
Render messages into a single plain-text prompt for the manual UI.
"""
chunks: List[str] = []
for m in messages:
role = str(m.get("role", "user")).upper()
content = m.get("content", "")
chunks.append(f"### {role}\n{content}\n")
return "\n".join(chunks).rstrip() + "\n"


def _atomic_write_json(path: Path, payload: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.parent / f".{path.name}.tmp"
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(path)


class OpenAILLM(LLMInterface):
"""LLM interface using OpenAI-compatible APIs"""

Expand All @@ -35,15 +64,30 @@ def __init__(
self.random_seed = getattr(model_cfg, "random_seed", None)
self.reasoning_effort = getattr(model_cfg, "reasoning_effort", None)

# Set up API client
# OpenAI client requires max_retries to be int, not None
max_retries = self.retries if self.retries is not None else 0
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=self.api_base,
timeout=self.timeout,
max_retries=max_retries,
)
# Manual mode: enabled via llm.manual_mode in config.yaml
self.manual_mode = (getattr(model_cfg, "manual_mode", False) is True)
self.manual_queue_dir: Optional[Path] = None

if self.manual_mode:
qdir = getattr(model_cfg, "_manual_queue_dir", None)
if not qdir:
raise ValueError(
"Manual mode is enabled but manual_queue_dir is missing. "
"This should be injected by the OpenEvolve controller."
)
self.manual_queue_dir = Path(str(qdir)).expanduser().resolve()
self.manual_queue_dir.mkdir(parents=True, exist_ok=True)
self.client = None
else:
# Set up API client (normal mode)
# OpenAI client requires max_retries to be int, not None
max_retries = self.retries if self.retries is not None else 0
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=self.api_base,
timeout=self.timeout,
max_retries=max_retries,
)

# Only log unique models to reduce duplication
if not hasattr(logger, "_initialized_models"):
Expand Down Expand Up @@ -122,8 +166,9 @@ async def generate_with_context(

# Add seed parameter for reproducibility if configured
# Skip seed parameter for Google AI Studio endpoint as it doesn't support it
# Seed only makes sense for actual API calls
seed = kwargs.get("seed", self.random_seed)
if seed is not None:
if seed is not None and not self.manual_mode:
if self.api_base == "https://generativelanguage.googleapis.com/v1beta/openai/":
logger.warning(
"Skipping seed parameter as Google AI Studio endpoint doesn't support it. "
Expand All @@ -135,6 +180,12 @@ async def generate_with_context(
# Attempt the API call with retries
retries = kwargs.get("retries", self.retries)
retry_delay = kwargs.get("retry_delay", self.retry_delay)

# Manual mode: no timeout unless explicitly passed by the caller
if self.manual_mode:
timeout = kwargs.get("timeout", None)
return await self._manual_wait_for_answer(params, timeout=timeout)

timeout = kwargs.get("timeout", self.timeout)

for attempt in range(retries + 1):
Expand All @@ -160,6 +211,9 @@ async def generate_with_context(

async def _call_api(self, params: Dict[str, Any]) -> str:
"""Make the actual API call"""
if self.client is None:
raise RuntimeError("OpenAI client is not initialized (manual_mode enabled?)")

# Use asyncio to run the blocking API call in a thread pool
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
Expand All @@ -170,3 +224,63 @@ async def _call_api(self, params: Dict[str, Any]) -> str:
logger.debug(f"API parameters: {params}")
logger.debug(f"API response: {response.choices[0].message.content}")
return response.choices[0].message.content

async def _manual_wait_for_answer(
self, params: Dict[str, Any], timeout: Optional[Union[int, float]]
) -> str:
"""
Manual mode: write a task JSON file and poll for *.answer.json
If timeout is provided, we respect it; otherwise we wait indefinitely
"""

if self.manual_queue_dir is None:
raise RuntimeError("manual_queue_dir is not initialized")

task_id = str(uuid.uuid4())
messages = params.get("messages", [])
display_prompt = _build_display_prompt(messages)

task_payload: Dict[str, Any] = {
"id": task_id,
"created_at": _iso_now(),
"model": params.get("model"),
"display_prompt": display_prompt,
"messages": messages,
"meta": {
"max_tokens": params.get("max_tokens"),
"max_completion_tokens": params.get("max_completion_tokens"),
"temperature": params.get("temperature"),
"top_p": params.get("top_p"),
"reasoning_effort": params.get("reasoning_effort"),
"verbosity": params.get("verbosity"),
},
}

task_path = self.manual_queue_dir / f"{task_id}.json"
answer_path = self.manual_queue_dir / f"{task_id}.answer.json"

_atomic_write_json(task_path, task_payload)
logger.info(f"[manual_mode] Task enqueued: {task_path}")

start = time.time()
poll_interval = 0.5

while True:
if answer_path.exists():
try:
data = json.loads(answer_path.read_text(encoding="utf-8"))
except Exception as e:
logger.warning(f"[manual_mode] Failed to parse answer JSON for {task_id}: {e}")
await asyncio.sleep(poll_interval)
continue

answer = str(data.get("answer") or "")
logger.info(f"[manual_mode] Answer received for {task_id}")
return answer

if timeout is not None and (time.time() - start) > float(timeout):
raise asyncio.TimeoutError(
f"Manual mode timed out after {timeout} seconds waiting for answer of task {task_id}"
)

await asyncio.sleep(poll_interval)
Loading