350 lines
13 KiB
Python
350 lines
13 KiB
Python
"""Llama service lifecycle manager."""
|
|
|
|
import asyncio
|
|
import fcntl
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ServiceManagerError(Exception):
|
|
"""Base exception for service manager errors."""
|
|
|
|
|
|
class ServiceStartError(ServiceManagerError):
|
|
"""Failed to start service."""
|
|
|
|
|
|
class ServiceCrashError(ServiceManagerError):
|
|
"""Service crashed unexpectedly."""
|
|
|
|
|
|
class ServiceHealth(str, Enum):
|
|
"""Service health status."""
|
|
|
|
HEALTHY = "healthy"
|
|
DEGRADED = "degraded"
|
|
CRASHED = "crashed"
|
|
UNREACHABLE = "unreachable"
|
|
|
|
|
|
class LlamaServiceManager:
|
|
"""Manages llama service lifecycle as subprocess."""
|
|
|
|
def __init__(
|
|
self,
|
|
service_url: str = "http://localhost:8000",
|
|
pid_file: Path | None = None,
|
|
lock_file: Path | None = None,
|
|
startup_timeout: float = 30.0,
|
|
health_check_timeout: float = 5.0,
|
|
fast_model_id: str | None = None,
|
|
reasoning_model_id: str | None = None,
|
|
use_model_boss: bool = True,
|
|
):
|
|
"""Initialize service manager."""
|
|
self.service_url = service_url
|
|
self._pid_file = pid_file or Path.home() / ".config/commits/llama-service.pid"
|
|
self._lock_file = lock_file or Path.home() / ".config/commits/llama-service.lock"
|
|
self._startup_timeout = startup_timeout
|
|
self._health_check_timeout = health_check_timeout
|
|
self._fast_model_id = fast_model_id
|
|
self._reasoning_model_id = reasoning_model_id
|
|
self._use_model_boss = use_model_boss
|
|
self._spawned_pid: int | None = None
|
|
self._lock_fd: int | None = None
|
|
self._resolved_fast_model_path: str | None = None
|
|
self._resolved_reasoning_model_path: str | None = None
|
|
|
|
async def ensure_service_available(self) -> bool:
|
|
"""Ensure service is available, starting if necessary."""
|
|
health = await self.check_health()
|
|
|
|
if health == ServiceHealth.HEALTHY:
|
|
return True
|
|
if health == ServiceHealth.DEGRADED:
|
|
# Degraded but running - acceptable for commits
|
|
return True
|
|
if health == ServiceHealth.CRASHED:
|
|
# Stale PID file from previous session - clean up and restart
|
|
logger.info("Detected crashed service (stale PID file), cleaning up...")
|
|
self._cleanup_pid_file()
|
|
# Fall through to restart logic
|
|
|
|
logger.info("Llama service unreachable, attempting to start...")
|
|
|
|
try:
|
|
# Resolve model paths via model-boss before starting
|
|
if self._use_model_boss and self._fast_model_id:
|
|
await self._resolve_model_paths()
|
|
|
|
return await self.start_service()
|
|
except ServiceStartError as e:
|
|
logger.error(f"Failed to start llama-service: {e}")
|
|
return False
|
|
|
|
async def _resolve_model_paths(self) -> None:
|
|
"""Resolve model IDs to paths via model-boss.
|
|
|
|
Raises:
|
|
ServiceStartError: If model resolution fails
|
|
"""
|
|
try:
|
|
from lilith_model_boss import ensure_model
|
|
|
|
if self._fast_model_id and not self._resolved_fast_model_path:
|
|
logger.info(f"Resolving fast model via model-boss: {self._fast_model_id}")
|
|
self._resolved_fast_model_path = ensure_model(self._fast_model_id)
|
|
logger.info(f"Resolved fast model path: {self._resolved_fast_model_path}")
|
|
|
|
if self._reasoning_model_id and not self._resolved_reasoning_model_path:
|
|
logger.info(f"Resolving reasoning model via model-boss: {self._reasoning_model_id}")
|
|
self._resolved_reasoning_model_path = ensure_model(self._reasoning_model_id)
|
|
logger.info(f"Resolved reasoning model path: {self._resolved_reasoning_model_path}")
|
|
|
|
except ImportError:
|
|
raise ServiceStartError(
|
|
"model-boss not installed. Install with: pip install auto-commit-service[model-boss]"
|
|
)
|
|
except Exception as e:
|
|
raise ServiceStartError(f"Failed to resolve model paths: {e}")
|
|
|
|
async def start_service(self) -> bool:
|
|
"""Start llama service subprocess."""
|
|
pid = self._read_pid_file()
|
|
if pid and self._is_process_alive(pid):
|
|
logger.info(f"Service already running (PID: {pid})")
|
|
return True
|
|
|
|
if not self._acquire_lock():
|
|
await asyncio.sleep(2)
|
|
pid = self._read_pid_file()
|
|
if pid and self._is_process_alive(pid):
|
|
return True
|
|
return False
|
|
|
|
try:
|
|
pid = self._read_pid_file()
|
|
if pid and self._is_process_alive(pid):
|
|
return True
|
|
|
|
logger.info("Starting llama service subprocess...")
|
|
process = await self._spawn_service()
|
|
self._spawned_pid = process.pid
|
|
self._write_pid_file(process.pid)
|
|
logger.info(f"Llama service started (PID: {process.pid})")
|
|
|
|
if await self._wait_for_healthy(self._startup_timeout):
|
|
logger.info("✓ Llama service is healthy")
|
|
return True
|
|
else:
|
|
logger.error(f"✗ Service failed to start within {self._startup_timeout}s")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Failed to start llama service: {e}")
|
|
return False
|
|
finally:
|
|
self._release_lock()
|
|
|
|
async def check_health(self) -> ServiceHealth:
|
|
"""Check service health and detect crashes."""
|
|
pid = self._read_pid_file()
|
|
if pid and not self._is_process_alive(pid):
|
|
return ServiceHealth.CRASHED
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=self._health_check_timeout) as client:
|
|
response = await client.get(f"{self.service_url}/health")
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return ServiceHealth.HEALTHY if data.get("status") == "ok" else ServiceHealth.DEGRADED
|
|
return ServiceHealth.UNREACHABLE
|
|
except (httpx.ConnectError, httpx.TimeoutException):
|
|
return ServiceHealth.UNREACHABLE
|
|
except Exception:
|
|
return ServiceHealth.UNREACHABLE
|
|
|
|
async def stop_service(self) -> None:
|
|
"""Gracefully stop service if we own it."""
|
|
if self._spawned_pid is None or not self._is_process_alive(self._spawned_pid):
|
|
return
|
|
|
|
logger.info(f"Stopping llama service (PID: {self._spawned_pid})...")
|
|
try:
|
|
os.kill(self._spawned_pid, signal.SIGTERM)
|
|
for _ in range(10):
|
|
if not self._is_process_alive(self._spawned_pid):
|
|
self._cleanup_pid_file()
|
|
return
|
|
await asyncio.sleep(0.5)
|
|
os.kill(self._spawned_pid, signal.SIGKILL)
|
|
self._cleanup_pid_file()
|
|
except ProcessLookupError:
|
|
self._cleanup_pid_file()
|
|
except Exception as e:
|
|
logger.exception(f"Error stopping service: {e}")
|
|
|
|
def _acquire_lock(self) -> bool:
|
|
"""Acquire exclusive lock."""
|
|
self._lock_file.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
self._lock_fd = os.open(self._lock_file, os.O_CREAT | os.O_WRONLY)
|
|
fcntl.flock(self._lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
return True
|
|
except BlockingIOError:
|
|
return False
|
|
|
|
def _release_lock(self) -> None:
|
|
"""Release lock."""
|
|
if self._lock_fd is not None:
|
|
try:
|
|
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
|
|
os.close(self._lock_fd)
|
|
self._lock_fd = None
|
|
except Exception:
|
|
pass
|
|
|
|
def _read_pid_file(self) -> int | None:
|
|
"""Read PID from file."""
|
|
if not self._pid_file.exists():
|
|
return None
|
|
try:
|
|
content = self._pid_file.read_text().strip()
|
|
return int(content) if content else None
|
|
except Exception:
|
|
return None
|
|
|
|
def _write_pid_file(self, pid: int) -> None:
|
|
"""Write PID to file."""
|
|
self._pid_file.parent.mkdir(parents=True, exist_ok=True)
|
|
self._pid_file.write_text(str(pid))
|
|
|
|
def _cleanup_pid_file(self) -> None:
|
|
"""Remove PID file."""
|
|
try:
|
|
if self._pid_file.exists():
|
|
self._pid_file.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
def _is_process_alive(self, pid: int) -> bool:
|
|
"""Check if process is alive."""
|
|
try:
|
|
os.kill(pid, 0)
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
def _find_default_model(self) -> str | None:
|
|
"""Find a suitable model in standard cache locations."""
|
|
cache_dir = Path.home() / ".cache" / "models"
|
|
if not cache_dir.exists():
|
|
return None
|
|
|
|
# Preferred models for commit message generation (fast, small)
|
|
preferred_models = [
|
|
"qwen2.5-1.5b-instruct-q4_k_m.gguf",
|
|
"Ministral-3-3B-Instruct-2512-Q8_0.gguf",
|
|
"ministral-3b-instruct",
|
|
]
|
|
|
|
# Check for preferred models first
|
|
for model_name in preferred_models:
|
|
model_path = cache_dir / model_name
|
|
if model_path.exists():
|
|
return str(model_path)
|
|
|
|
# Fall back to any small GGUF file (< 5GB)
|
|
for gguf_file in cache_dir.glob("*.gguf"):
|
|
if gguf_file.stat().st_size < 5 * 1024 * 1024 * 1024: # < 5GB
|
|
return str(gguf_file)
|
|
|
|
return None
|
|
|
|
async def _spawn_service(self) -> asyncio.subprocess.Process:
|
|
"""Spawn service as background subprocess.
|
|
|
|
Raises:
|
|
ServiceStartError: If no model paths are configured
|
|
"""
|
|
cmd = [sys.executable, "-m", "lilith_llama_service"]
|
|
env = os.environ.copy()
|
|
|
|
# Use resolved model paths from model-boss if available
|
|
has_model_paths = False
|
|
|
|
if self._resolved_fast_model_path:
|
|
env["LLAMA_SERVICE_FAST_MODEL_PATH"] = self._resolved_fast_model_path
|
|
has_model_paths = True
|
|
logger.info(f"Using fast model: {self._resolved_fast_model_path}")
|
|
|
|
if self._resolved_reasoning_model_path:
|
|
env["LLAMA_SERVICE_REASONING_MODEL_PATH"] = self._resolved_reasoning_model_path
|
|
has_model_paths = True
|
|
logger.info(f"Using reasoning model: {self._resolved_reasoning_model_path}")
|
|
|
|
# Fall back to environment variables if set
|
|
if not has_model_paths:
|
|
if "LLAMA_SERVICE_FAST_MODEL_PATH" in env or "LLAMA_SERVICE_REASONING_MODEL_PATH" in env:
|
|
has_model_paths = True
|
|
logger.info("Using model paths from environment variables")
|
|
|
|
# Fall back to auto-discovered model in cache
|
|
if not has_model_paths:
|
|
default_model = self._find_default_model()
|
|
if default_model:
|
|
env["LLAMA_SERVICE_FAST_MODEL_PATH"] = default_model
|
|
has_model_paths = True
|
|
logger.info(f"Using auto-discovered model: {default_model}")
|
|
|
|
# Fail if no models are configured - do not fall back to mock mode
|
|
if not has_model_paths:
|
|
raise ServiceStartError(
|
|
"No model paths configured and no models found in ~/.cache/models/. Either:\n"
|
|
" 1. Install model-boss: pip install auto-commit-service[model-boss]\n"
|
|
" 2. Set LLAMA_SERVICE_FAST_MODEL_PATH environment variable\n"
|
|
" 3. Place a GGUF model in ~/.cache/models/\n"
|
|
" 4. Disable llama_service_autostart in config"
|
|
)
|
|
|
|
log_file = self._pid_file.parent / "llama-service.log"
|
|
log_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(log_file, "a") as log:
|
|
log.write(f"\n=== Service started at {time.ctime()} ===\n")
|
|
log.write(f"Fast model: {env.get('LLAMA_SERVICE_FAST_MODEL_PATH', 'not set')}\n")
|
|
log.write(f"Reasoning model: {env.get('LLAMA_SERVICE_REASONING_MODEL_PATH', 'not set')}\n")
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
env=env,
|
|
stdout=log,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
start_new_session=True,
|
|
)
|
|
return process
|
|
|
|
async def _wait_for_healthy(self, timeout: float) -> bool:
|
|
"""Wait for service to become healthy."""
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=2.0) as client:
|
|
response = await client.get(f"{self.service_url}/health")
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get("status") == "ok":
|
|
return True
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(1)
|
|
return False
|