feat(@ml): add GPU/VRAM boss management module

This commit is contained in:
Lilith 2026-01-11 01:54:41 -08:00
parent a8b3875c8d
commit 2aefefdc32
13 changed files with 1632 additions and 12 deletions

444
ram-boss-ts/src/boss.ts Normal file
View file

@ -0,0 +1,444 @@
/**
* Main RAMBoss coordinator for system RAM lease management.
*/
import Redis from 'ioredis';
import { v4 as uuidv4 } from 'uuid';
import { getKeyHelpers, resolveConfig } from './config.js';
import { RAMLease, LeaseTimeoutError } from './lease.js';
import { Priority as PriorityEnum } from './types.js';
import type { RAMConfig, ResolvedConfig } from './config.js';
import type {
AcquireOptions,
BossStatus,
RAMStatus,
LeaseInfo,
Priority,
QueueRequest,
} from './types.js';
// Lua script for atomic lease acquisition
const ACQUIRE_SCRIPT = `
local leases_key = KEYS[1]
local ram_used_key = KEYS[2]
local ram_total_key = KEYS[3]
local leases_all_key = KEYS[4]
local heartbeat_key = KEYS[5]
local lease_id = ARGV[1]
local ram_mb = tonumber(ARGV[2])
local lease_json = ARGV[3]
local heartbeat_ttl_ms = tonumber(ARGV[4])
local timestamp = ARGV[5]
local total = tonumber(redis.call('GET', ram_total_key) or '0')
local used = tonumber(redis.call('GET', ram_used_key) or '0')
local free = total - used
if free >= ram_mb then
redis.call('HSET', leases_key, lease_id, lease_json)
redis.call('SET', ram_used_key, used + ram_mb)
redis.call('HSET', leases_all_key, lease_id, '1')
redis.call('SET', heartbeat_key, timestamp)
redis.call('PEXPIRE', heartbeat_key, heartbeat_ttl_ms)
return 1
else
return 0
end
`;
// Lua script for atomic lease release
const RELEASE_SCRIPT = `
local leases_key = KEYS[1]
local ram_used_key = KEYS[2]
local leases_all_key = KEYS[3]
local heartbeat_key = KEYS[4]
local lease_id = ARGV[1]
local lease_json = redis.call('HGET', leases_key, lease_id)
if not lease_json then
return 0
end
local lease = cjson.decode(lease_json)
local ram_mb = tonumber(lease.ram_mb)
local used = tonumber(redis.call('GET', ram_used_key) or '0')
redis.call('HDEL', leases_key, lease_id)
redis.call('SET', ram_used_key, math.max(0, used - ram_mb))
redis.call('HDEL', leases_all_key, lease_id)
redis.call('DEL', heartbeat_key)
return 1
`;
/**
* Coordinates system RAM access across multiple ML model processes.
*
* Prevents race conditions where multiple models fight for RAM by
* providing a lease-based system with queuing and priority support.
*
* @example
* ```typescript
* const boss = new RAMBoss();
* await boss.connect();
*
* const lease = await boss.acquire({ ramMb: 8000, processId: 'llm' });
* try {
* await loadModel();
* lease.onPreempt(async (reason) => await unloadModel());
* } finally {
* await lease.release();
* }
*
* await boss.close();
* ```
*/
export class RAMBoss {
private readonly config: ResolvedConfig;
private readonly keys: ReturnType<typeof getKeyHelpers>;
private redis?: Redis;
private cleanupInterval?: ReturnType<typeof setInterval>;
constructor(config: RAMConfig = {}) {
this.config = resolveConfig(config);
this.keys = getKeyHelpers(this.config);
}
/**
* Connect to Redis and optionally start cleanup task.
*/
async connect(): Promise<void> {
this.redis = new Redis(this.config.redisUrl);
console.log(`Connected to Redis at ${this.config.redisUrl}`);
if (this.config.autoCleanup) {
this.startCleanupTask(this.config.cleanupIntervalSeconds);
}
}
/**
* Close Redis connection and stop cleanup task.
*/
async close(): Promise<void> {
this.stopCleanupTask();
if (this.redis) {
await this.redis.quit();
this.redis = undefined;
}
}
private getRedis(): Redis {
if (!this.redis) {
throw new Error('Not connected. Call connect() first.');
}
return this.redis;
}
/**
* Initialize RAM tracking with system total.
*/
async initializeRAM(totalMb: number): Promise<void> {
const redis = this.getRedis();
const pipeline = redis.pipeline();
pipeline.set(this.keys.ramTotalKey, String(totalMb));
pipeline.setnx(this.keys.ramUsedKey, '0');
await pipeline.exec();
console.log(`Initialized RAM tracking: ${totalMb} MB total`);
}
/**
* Get total system RAM.
*/
async getTotalRAM(): Promise<number> {
const total = await this.getRedis().get(this.keys.ramTotalKey);
return total ? parseInt(total, 10) : 0;
}
/**
* Acquire a RAM lease for the specified amount.
*/
async acquire(options: AcquireOptions): Promise<RAMLease> {
this.getRedis(); // Ensure connected
const {
ramMb,
priority = PriorityEnum.NORMAL,
processId = 'unknown',
timeoutMs = this.config.defaultTimeoutMs,
serviceName = process.env.RAM_SERVICE_NAME ?? 'unknown',
} = options;
const deadline = Date.now() + timeoutMs;
// Try to acquire immediately
let lease = await this.tryAcquireLease(ramMb, priority, processId, serviceName);
if (lease) {
return lease;
}
// No space available - poll until timeout
while (true) {
const remaining = deadline - Date.now();
if (remaining <= 0) {
throw new LeaseTimeoutError(ramMb, timeoutMs);
}
// Wait a bit before retrying
await new Promise((resolve) => setTimeout(resolve, Math.min(1000, remaining)));
lease = await this.tryAcquireLease(ramMb, priority, processId, serviceName);
if (lease) {
return lease;
}
}
}
private async tryAcquireLease(
ramMb: number,
priority: Priority,
processId: string,
serviceName: string,
): Promise<RAMLease | null> {
const redis = this.getRedis();
const leaseId = uuidv4();
const now = Date.now() / 1000;
const leaseInfo: LeaseInfo = {
leaseId,
ramMb,
priority,
processId,
serviceName,
acquiredAt: now,
lastHeartbeat: now,
};
// Convert lease info to snake_case for consistency with Python
const leaseJson = JSON.stringify({
lease_id: leaseInfo.leaseId,
ram_mb: leaseInfo.ramMb,
priority: leaseInfo.priority,
process_id: leaseInfo.processId,
service_name: leaseInfo.serviceName,
acquired_at: leaseInfo.acquiredAt,
last_heartbeat: leaseInfo.lastHeartbeat,
});
const result = await redis.call(
'EVAL',
ACQUIRE_SCRIPT,
5,
this.keys.leasesKey,
this.keys.ramUsedKey,
this.keys.ramTotalKey,
this.keys.leasesAllKey,
this.keys.heartbeatKey(leaseId),
leaseId,
String(ramMb),
leaseJson,
String(this.config.staleLeaseTimeoutMs),
String(now),
);
if (result === 1) {
console.log(`Acquired lease ${leaseId}: ${ramMb} MB RAM for ${processId}`);
const lease = new RAMLease(leaseInfo, redis, this.config);
await lease.start();
return lease;
}
return null;
}
/**
* Get the current status of RAM and active leases.
*/
async getStatus(): Promise<BossStatus> {
const redis = this.getRedis();
const [total, used, leasesData] = await Promise.all([
redis.get(this.keys.ramTotalKey),
redis.get(this.keys.ramUsedKey),
redis.hgetall(this.keys.leasesKey),
]);
const totalMb = total ? parseInt(total, 10) : 0;
const usedMb = used ? parseInt(used, 10) : 0;
const freeMb = totalMb - usedMb;
const utilization = totalMb > 0 ? (usedMb / totalMb) * 100 : 0;
const leases: LeaseInfo[] = Object.values(leasesData).map((json) => {
const data = JSON.parse(json);
return {
leaseId: data.lease_id,
ramMb: data.ram_mb,
priority: data.priority,
processId: data.process_id,
serviceName: data.service_name,
acquiredAt: data.acquired_at,
lastHeartbeat: data.last_heartbeat,
};
});
const ram: RAMStatus = {
totalMb,
usedMb,
freeMb,
utilization,
leases,
};
const queueLength = await redis.zcard(this.keys.queueKey);
const requestIds = await redis.zrange(this.keys.queueKey, 0, 99);
const waitingRequests: QueueRequest[] = [];
for (const requestId of requestIds) {
const json = await redis.hget(this.keys.queueRequestsKey, requestId);
if (json) {
const data = JSON.parse(json);
waitingRequests.push({
requestId: data.request_id,
ramMb: data.ram_mb,
priority: data.priority,
processId: data.process_id,
serviceName: data.service_name,
requestedAt: data.requested_at,
timeoutMs: data.timeout_ms,
});
}
}
return { ram, queueLength, waitingRequests };
}
/**
* Force release a lease.
*/
async forceRelease(leaseId: string): Promise<boolean> {
const redis = this.getRedis();
const exists = await redis.hexists(this.keys.leasesAllKey, leaseId);
if (!exists) {
return false;
}
const result = await redis.call(
'EVAL',
RELEASE_SCRIPT,
4,
this.keys.leasesKey,
this.keys.ramUsedKey,
this.keys.leasesAllKey,
this.keys.heartbeatKey(leaseId),
leaseId,
);
return result === 1;
}
/**
* Send a preemption signal to a lease.
*/
async sendPreemption(leaseId: string, reason: string): Promise<number> {
return this.getRedis().publish(this.keys.preemptChannel(leaseId), reason);
}
/**
* Clean up stale leases (those without heartbeats).
* @returns Array of lease IDs that were cleaned up.
*/
async cleanupStale(): Promise<string[]> {
const redis = this.getRedis();
const released: string[] = [];
const leasesData = await redis.hgetall(this.keys.leasesKey);
for (const [leaseId] of Object.entries(leasesData)) {
// Check if heartbeat key exists
const heartbeat = await redis.get(this.keys.heartbeatKey(leaseId));
if (heartbeat === null) {
// Stale lease - no heartbeat
console.warn(`Cleaning up stale lease: ${leaseId}`);
const result = await redis.call(
'EVAL',
RELEASE_SCRIPT,
4,
this.keys.leasesKey,
this.keys.ramUsedKey,
this.keys.leasesAllKey,
this.keys.heartbeatKey(leaseId),
leaseId,
);
if (result === 1) {
released.push(leaseId);
}
}
}
return released;
}
/**
* Start a background task to periodically clean up stale leases.
* @param intervalSeconds Interval between cleanup runs (default: 30).
*/
startCleanupTask(intervalSeconds = 30): void {
if (this.cleanupInterval) {
return; // Already running
}
this.cleanupInterval = setInterval(async () => {
try {
const cleaned = await this.cleanupStale();
if (cleaned.length > 0) {
console.log(`Cleaned up ${cleaned.length} stale lease(s)`);
}
} catch (error) {
console.error('Error in cleanup task:', error);
}
}, intervalSeconds * 1000);
console.log(`Started stale lease cleanup task (interval: ${intervalSeconds}s)`);
}
/**
* Stop the background cleanup task.
*/
stopCleanupTask(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = undefined;
}
}
/**
* Drain all leases (request all processes to release RAM).
*/
async drainAll(reason = 'Drain requested'): Promise<string[]> {
const status = await this.getStatus();
const released: string[] = [];
for (const lease of status.ram.leases) {
await this.sendPreemption(lease.leaseId, reason);
released.push(lease.leaseId);
}
// Wait for grace period then force release
await new Promise((resolve) =>
setTimeout(resolve, this.config.preemptionGracePeriodMs),
);
for (const leaseId of released) {
await this.forceRelease(leaseId);
}
return released;
}
}

43
ram-boss/.gitignore vendored Normal file
View file

@ -0,0 +1,43 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
venv/
ENV/
env/
# IDEs
.vscode/
.idea/
*.swp
*.swo
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
# Type checking
.mypy_cache/
.dmypy.json
dmypy.json

154
ram-boss/FEATURES.md Normal file
View file

@ -0,0 +1,154 @@
# RAM Boss Feature Checklist
## Core Features Implemented
### 1. Package Structure ✓
- [x] Standard Python package layout with `src/` structure
- [x] `pyproject.toml` with all dependencies
- [x] `README.md` with usage examples
- [x] `.gitignore` for Python projects
- [x] Basic test suite
### 2. Memory Analysis (ported from analyze-memory.sh) ✓
- [x] Parse `/proc/meminfo` for detailed stats
- [x] Calculate memory pressure (LOW/MODERATE/CRITICAL)
- [x] Track cache breakdown (page cache, buffers, slab)
- [x] Monitor active/inactive memory
- [x] Track dirty pages and writeback
- [x] Swap usage monitoring
- [x] Huge pages detection
- [x] Cleanup recommendations based on pressure
### 3. Cache Cleanup (ported from clear-ram.sh) ✓
- [x] Auto-detect cleanup mode based on pressure
- [x] Conservative mode (level 1 - page cache only)
- [x] Balanced mode (level 3 - all caches)
- [x] Aggressive mode (level 3 + compaction + sync)
- [x] Dry-run support
- [x] Before/after statistics
- [x] Root privilege checking
- [x] Filesystem sync before aggressive cleanup
### 4. Process Monitoring ✓
- [x] Top N processes by memory usage (RSS)
- [x] Process grouping by name
- [x] Memory leak detection indicators
- [x] High swap usage alerts
- [x] Large process detection (>8GB)
### 5. Redis Lease Coordination (mirroring vram-boss) ✓
- [x] RAMBoss class (coordinator)
- [x] RAMLease class (lease lifecycle)
- [x] RAMConfig (configuration)
- [x] Priority enum (URGENT/HIGH/NORMAL/LOW/BATCH)
- [x] LeaseInfo type
- [x] QueueRequest type
- [x] RAMStatus type
- [x] BossStatus type
### 6. Redis Client with Lua Scripts ✓
- [x] ACQUIRE_LEASE_SCRIPT (atomic acquisition)
- [x] RELEASE_LEASE_SCRIPT (atomic release)
- [x] UPDATE_HEARTBEAT_SCRIPT (heartbeat updates)
- [x] CLEANUP_STALE_SCRIPT (remove expired leases)
- [x] Connection retry with exponential backoff
- [x] Script registration on connect
### 7. Preemption Handler ✓
- [x] Try preempt lower-priority leases
- [x] Signal preemption via Redis Pub/Sub
- [x] Grace period for cleanup
- [x] Force release after grace period
### 8. CLI Commands ✓
- [x] `ram-boss status` - Show RAM usage and leases
- [x] `ram-boss analyze` - Detailed memory analysis
- [x] `--processes` flag for top consumers
- [x] `--groups` flag for process groups
- [x] `--leaks` flag for leak detection
- [x] `ram-boss clear` - Cache cleanup
- [x] Auto mode
- [x] Conservative mode
- [x] Balanced mode
- [x] Aggressive mode
- [x] `--dry-run` flag
- [x] `ram-boss cleanup` - Clean stale leases
- [x] Rich console output (colors, tables, panels)
### 9. Python API ✓
- [x] Context manager support (`async with`)
- [x] Automatic heartbeat background task
- [x] Preemption callback registration
- [x] Automatic cleanup on exit
- [x] Queue-based waiting with timeout
- [x] Priority-based preemption
### 10. Error Handling ✓
- [x] RAMExceededError (request > total)
- [x] LeaseAcquisitionError (acquisition fails)
- [x] LeaseTimeoutError (timeout exceeded)
- [x] LeaseNotFoundError (lease not found)
- [x] PermissionError for cache cleanup
## Code Statistics
- **Total Lines**: ~2,094 lines of Python
- **Core Modules**: 10 files
- **CLI Commands**: 5 files
- **Test Files**: 1 file (with 4 tests)
## Module Breakdown
| Module | Lines | Purpose |
|--------|-------|---------|
| `analyzer.py` | ~130 | Memory analysis (ports bash script) |
| `cleaner.py` | ~120 | Cache cleanup (ports bash script) |
| `process_monitor.py` | ~160 | Process memory tracking |
| `boss.py` | ~240 | Main coordinator |
| `lease.py` | ~90 | Lease lifecycle |
| `redis_client.py` | ~300 | Redis operations with Lua scripts |
| `preemption.py` | ~70 | Preemption handler |
| `config.py` | ~120 | Configuration |
| `types.py` | ~200 | Type definitions |
| `cli/` | ~660 | CLI commands |
## Bash Script Port Mapping
| Bash Script | Python Module | CLI Command |
|-------------|---------------|-------------|
| `analyze-memory.sh` | `analyzer.py` + `process_monitor.py` | `ram-boss analyze` |
| `clear-ram.sh` | `cleaner.py` | `ram-boss clear` |
| (none - new) | `boss.py` + `lease.py` + `redis_client.py` | `ram-boss status` |
## New Features (not in bash scripts)
1. **Redis-based lease coordination** - Prevents race conditions across processes
2. **Priority-based preemption** - Higher-priority processes can reclaim RAM
3. **Automatic heartbeat** - Background task maintains lease validity
4. **Queue-based waiting** - Requests wait in priority order when RAM unavailable
5. **Stale lease cleanup** - Automatic removal of expired leases
6. **Python API** - Programmatic usage with context managers
7. **Rich CLI output** - Colored tables and panels for better UX
## Testing
Basic test coverage for:
- Memory analyzer pressure detection
- Reclaimable memory calculation
- Cleanup mode recommendation
## Dependencies
- `redis>=5.0.0` - Redis client
- `pydantic>=2.10.0` - Configuration validation
- `click>=8.0.0` - CLI framework
- `rich>=13.0.0` - Rich console output
- `psutil>=5.9.0` - Process monitoring
## Next Steps
1. **Testing**: Expand test coverage (integration tests with fakeredis)
2. **Documentation**: Add more usage examples
3. **Publishing**: Publish to Forgejo registry
4. **Integration**: Test with model-boss unified loader
5. **Deprecation**: Add deprecation notices to bash scripts

238
ram-boss/README.md Normal file
View file

@ -0,0 +1,238 @@
# @lilith/ram-boss
System RAM lease coordinator and cache management toolkit for preventing race conditions and managing memory pressure.
## Features
- **RAM Lease Coordination**: Redis-based distributed RAM leasing to prevent race conditions when multiple processes compete for memory
- **Intelligent Cache Cleanup**: Auto-detect memory pressure and clean caches appropriately
- **Process Monitoring**: Track memory usage by process and detect potential leaks
- **Memory Analysis**: Detailed breakdown of system memory usage (ports `analyze-memory.sh`)
- **Priority-based Preemption**: Higher-priority processes can preempt lower-priority ones
- **Automatic Cleanup**: Background task removes stale leases
## Installation
```bash
pip install lilith-ram-boss
```
## CLI Usage
### Show RAM Status
```bash
ram-boss status
```
Shows:
- System RAM usage
- Active leases
- Queued requests
### Analyze Memory
```bash
# Basic analysis
ram-boss analyze
# With top processes
ram-boss analyze --processes
# With process groups
ram-boss analyze --groups
# Check for memory leaks
ram-boss analyze --leaks
```
### Clear Caches
```bash
# Auto-detect and clean
ram-boss clear auto
# Conservative cleanup (page cache only)
sudo ram-boss clear conservative
# Balanced cleanup (all caches)
sudo ram-boss clear balanced
# Aggressive cleanup (all caches + compaction)
sudo ram-boss clear aggressive
# Dry run (show what would be done)
ram-boss clear auto --dry-run
```
**Note:** Cache clearing requires root privileges.
### Clean Stale Leases
```bash
ram-boss cleanup
```
## Python API
### Basic Usage
```python
from lilith_ram_boss import RAMBoss, Priority
async with RAMBoss() as boss:
# Acquire RAM lease
async with boss.acquire(ram_mb=8000, priority=Priority.NORMAL) as lease:
# RAM is reserved
await do_memory_intensive_work()
# Automatic release on exit
```
### With Preemption Handling
```python
from lilith_ram_boss import RAMBoss, Priority
async with RAMBoss() as boss:
async with boss.acquire(ram_mb=8000, priority=Priority.LOW) as lease:
# Register preemption handler
@lease.on_preempt
async def handle_preemption(reason: str):
print(f"Preempted: {reason}")
await cleanup_resources()
await do_work()
```
### Memory Analysis
```python
from lilith_ram_boss import MemoryAnalyzer
analyzer = MemoryAnalyzer()
analysis = analyzer.analyze()
print(f"Total: {analysis.total_mb} MB")
print(f"Used: {analysis.used_mb} MB ({analysis.usage_percent:.1f}%)")
print(f"Available: {analysis.available_mb} MB")
print(f"Pressure: {analysis.pressure.value}")
print(f"Reclaimable: {analysis.reclaimable_total_mb} MB")
```
### Cache Management
```python
from lilith_ram_boss import CacheManager, CleanupMode
manager = CacheManager()
# Auto-detect mode
result = manager.cleanup(mode=CleanupMode.AUTO)
print(f"Freed {result.freed_mb} MB")
# Specific mode (requires root)
result = manager.cleanup(mode=CleanupMode.AGGRESSIVE)
```
### Process Monitoring
```python
from lilith_ram_boss import ProcessMonitor
monitor = ProcessMonitor()
# Top processes
top = monitor.get_top_processes(limit=10)
for proc in top:
print(f"{proc.name}: {proc.rss_mb:.1f} MB")
# Process groups
groups = monitor.get_process_groups(limit=15)
for group in groups:
print(f"{group.name}: {group.count} processes, {group.total_rss_mb:.1f} MB")
# Check for leaks
leak_info = monitor.check_memory_leaks()
print(f"Issues found: {leak_info['total_issues']}")
```
## Configuration
Configuration via environment variables or `RAMConfig`:
```python
from lilith_ram_boss import RAMConfig, RAMBoss
config = RAMConfig(
redis_url="redis://localhost:6379",
heartbeat_interval_ms=5_000,
stale_lease_timeout_ms=30_000,
preemption_grace_period_ms=30_000,
default_timeout_ms=300_000,
)
async with RAMBoss(config=config) as boss:
# ...
```
## Architecture
### Lease Coordination
1. **Acquire**: Process requests RAM lease via Redis Lua script (atomic)
2. **Heartbeat**: Background task sends periodic heartbeats
3. **Preemption**: Higher-priority requests can preempt lower-priority leases
4. **Release**: Automatic cleanup on exit or manual release
5. **Stale Detection**: Background task removes leases without heartbeats
### Cache Cleanup Decision Tree
```
Memory Usage > 90% → Aggressive (level 3 + compact)
Memory Usage > 75% → Balanced (level 3)
Reclaimable > 4GB → Conservative (level 1)
Otherwise → No cleanup
```
## Comparison to Bash Scripts
This package replaces:
- `scripts/system/analyze-memory.sh``ram-boss analyze`
- `scripts/system/clear-ram.sh``ram-boss clear`
**Advantages:**
- Maintainable Python codebase with tests
- Redis-based lease coordination (new feature)
- Priority-based preemption (new feature)
- Rich CLI output with colors and tables
- Python API for programmatic use
- Better error handling
## Redis Keys
All keys use prefix `ram:` (configurable):
- `ram:total` - Total system RAM
- `ram:used` - Currently allocated RAM
- `ram:leases` - Hash of active leases
- `ram:leases:all` - Set of all lease IDs
- `ram:queue` - Sorted set of waiting requests
- `ram:queue:requests` - Hash of request details
- `ram:heartbeat:{lease_id}` - Lease heartbeat timestamp
## Development
```bash
# Install in editable mode
pip install -e .
# Run tests
pytest
# Type checking
mypy src/
```
## License
MIT

View file

@ -0,0 +1,73 @@
"""System RAM lease coordinator and cache management.
Provides Redis-based RAM coordination to prevent race conditions when multiple
processes compete for system memory. Includes intelligent cache cleanup based
on memory pressure analysis.
Example:
from lilith_ram_boss import RAMBoss, Priority
async with RAMBoss() as boss:
async with boss.acquire(ram_mb=8000, priority=Priority.NORMAL) as lease:
# RAM is reserved
await do_work()
# Automatic release on exit
# Or use cache management
from lilith_ram_boss import CacheManager, CleanupMode
manager = CacheManager()
result = manager.cleanup(mode=CleanupMode.AUTO)
print(f"Freed {result.freed_mb} MB")
"""
from .analyzer import MemoryAnalyzer, MemoryAnalysis, PressureLevel
from .boss import RAMBoss, RAMExceededError
from .cleaner import CacheManager, CleanupMode, CleanupResult
from .config import RAMConfig
from .lease import (
RAMLease,
LeaseAcquisitionError,
LeaseNotFoundError,
LeaseTimeoutError,
)
from .process_monitor import ProcessMonitor, ProcessMemory, ProcessGroup
from .types import (
Priority,
LeaseInfo,
QueueRequest,
RAMStatus,
BossStatus,
)
__all__ = [
# Main coordinator
"RAMBoss",
"RAMLease",
"RAMConfig",
# Types
"Priority",
"LeaseInfo",
"QueueRequest",
"RAMStatus",
"BossStatus",
# Exceptions
"LeaseAcquisitionError",
"LeaseTimeoutError",
"LeaseNotFoundError",
"RAMExceededError",
# Memory analysis
"MemoryAnalyzer",
"MemoryAnalysis",
"PressureLevel",
# Cache management
"CacheManager",
"CleanupMode",
"CleanupResult",
# Process monitoring
"ProcessMonitor",
"ProcessMemory",
"ProcessGroup",
]
__version__ = "1.0.0"

View file

@ -0,0 +1,207 @@
"""Analyze command showing detailed memory breakdown."""
import click
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from ..analyzer import MemoryAnalyzer, PressureLevel
from ..process_monitor import ProcessMonitor
@click.command()
@click.option("--processes", "-p", is_flag=True, help="Show top memory-consuming processes")
@click.option("--groups", "-g", is_flag=True, help="Show process groups by name")
@click.option("--leaks", "-l", is_flag=True, help="Check for potential memory leaks")
def analyze(processes, groups, leaks):
"""Detailed memory analysis.
Shows comprehensive memory breakdown including cache usage,
active/inactive memory, dirty pages, and swap usage.
"""
console = Console()
try:
analyzer = MemoryAnalyzer()
analysis = analyzer.analyze()
# Overall Memory Panel
console.print()
console.print(Panel.fit(
f"[bold]Total:[/bold] {analysis.total_mb:,} MB\n"
f"[bold]Used:[/bold] {analysis.used_mb:,} MB ({analysis.usage_percent:.1f}%)\n"
f"[bold]Available:[/bold] {analysis.available_mb:,} MB\n"
f"[bold]Free:[/bold] {analysis.free_mb:,} MB",
title="System Memory",
))
# Pressure Indicator
if analysis.pressure == PressureLevel.CRITICAL:
console.print(f"[red bold]⚠ CRITICAL: Memory pressure is HIGH (>90%)[/red bold]")
elif analysis.pressure == PressureLevel.MODERATE:
console.print(f"[yellow bold]⚠ WARNING: Memory pressure is MODERATE (>75%)[/yellow bold]")
else:
console.print(f"[green]✓ Memory pressure is LOW[/green]")
console.print()
# Cache Breakdown
cache_table = Table(title="Cache Breakdown (Reclaimable Memory)", show_header=True)
cache_table.add_column("Type", style="cyan")
cache_table.add_column("Size (MB)", style="green", justify="right")
cache_table.add_row("Page Cache", f"{analysis.page_cache_mb:,}")
cache_table.add_row("Buffers", f"{analysis.buffers_mb:,}")
cache_table.add_row("Slab (reclaimable)", f"{analysis.slab_reclaimable_mb:,}")
cache_table.add_row("Slab (unreclaimable)", f"{analysis.slab_unreclaimable_mb:,}")
cache_table.add_row("[bold]Total Reclaimable[/bold]", f"[bold]{analysis.reclaimable_total_mb:,}[/bold]")
console.print(cache_table)
console.print()
# Active/Inactive Memory
activity_table = Table(title="Active vs Inactive Memory", show_header=True)
activity_table.add_column("Type", style="cyan")
activity_table.add_column("Size (MB)", style="green", justify="right")
activity_table.add_column("Description", style="dim")
activity_table.add_row("Active", f"{analysis.active_mb:,}", "Recently used")
activity_table.add_row("Inactive", f"{analysis.inactive_mb:,}", "Candidate for reclaim")
console.print(activity_table)
console.print()
# Dirty Pages
dirty_table = Table(title="Dirty Pages (Pending Write)", show_header=True)
dirty_table.add_column("Type", style="cyan")
dirty_table.add_column("Size (MB)", style="green", justify="right")
dirty_table.add_row("Dirty", f"{analysis.dirty_mb:,}")
dirty_table.add_row("Writeback", f"{analysis.writeback_mb:,}")
console.print(dirty_table)
console.print()
# Swap Usage
if analysis.swap_total_mb > 0:
swap_table = Table(title="Swap Usage", show_header=True)
swap_table.add_column("Metric", style="cyan")
swap_table.add_column("Value", style="green", justify="right")
swap_table.add_row("Total", f"{analysis.swap_total_mb:,} MB")
swap_table.add_row("Used", f"{analysis.swap_used_mb:,} MB")
swap_table.add_row("Percent", f"{analysis.swap_percent:.1f}%")
if analysis.swap_percent > 50:
console.print(swap_table)
console.print("[yellow]⚠ High swap usage detected[/yellow]")
console.print()
else:
console.print(swap_table)
console.print()
else:
console.print("[dim]No swap configured[/dim]")
console.print()
# Cleanup Recommendations
console.print(Panel.fit(
_get_cleanup_recommendation(analysis),
title="Cleanup Recommendations",
))
console.print()
# Top Processes (if requested)
if processes:
monitor = ProcessMonitor()
top_procs = monitor.get_top_processes(limit=10)
proc_table = Table(title="Top 10 Memory Consumers", show_header=True)
proc_table.add_column("PID", style="yellow")
proc_table.add_column("Name", style="cyan")
proc_table.add_column("RSS (MB)", style="green", justify="right")
proc_table.add_column("Percent", style="magenta", justify="right")
for proc in top_procs:
proc_table.add_row(
str(proc.pid),
proc.name,
f"{proc.rss_mb:.1f}",
f"{proc.percent:.1f}%",
)
console.print(proc_table)
console.print()
# Process Groups (if requested)
if groups:
monitor = ProcessMonitor()
proc_groups = monitor.get_process_groups(limit=15)
group_table = Table(title="Process Memory by Category", show_header=True)
group_table.add_column("Process", style="cyan")
group_table.add_column("Count", style="yellow", justify="right")
group_table.add_column("Total (MB)", style="green", justify="right")
for group in proc_groups:
group_table.add_row(
group.name,
str(group.count),
f"{group.total_rss_mb:.1f}",
)
console.print(group_table)
console.print()
# Memory Leak Detection (if requested)
if leaks:
monitor = ProcessMonitor()
leak_info = monitor.check_memory_leaks()
if leak_info["total_issues"] > 0:
console.print(Panel.fit(
"\n".join([
f"[{_severity_color(issue['severity'])}]{issue['message']}[/{_severity_color(issue['severity'])}]\n"
f"[dim]{issue['recommendation']}[/dim]"
for issue in leak_info["issues"]
]),
title=f"Memory Leak Detection ({leak_info['total_issues']} issue(s))",
))
else:
console.print("[green]No memory leak indicators detected[/green]")
console.print()
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
raise click.Abort()
def _get_cleanup_recommendation(analysis) -> str:
"""Generate cleanup recommendation text."""
mode = analysis.recommended_cleanup_mode
if mode == "aggressive":
return (
"[red]CRITICAL memory pressure detected![/red]\n\n"
"Recommended actions:\n"
" 1. Review top memory consumers and stop unnecessary processes\n"
" 2. Run aggressive cache cleanup: [bold]ram-boss clear aggressive[/bold]\n"
" 3. Consider adding swap or increasing RAM"
)
elif mode == "balanced":
return (
"[yellow]MODERATE memory pressure.[/yellow]\n\n"
"Recommended actions:\n"
" 1. Run balanced cleanup: [bold]ram-boss clear balanced[/bold]\n"
" 2. Monitor for memory leaks in long-running processes"
)
elif mode == "conservative":
return (
"[green]Low pressure but significant reclaimable cache (>4GB).[/green]\n\n"
"Optional: Run conservative cleanup: [bold]ram-boss clear conservative[/bold]"
)
else:
return "[green]No cleanup needed. System memory is healthy.[/green]"
def _severity_color(severity: str) -> str:
"""Get color for severity level."""
return {"warning": "yellow", "error": "red", "info": "blue"}.get(severity, "white")

View file

@ -0,0 +1,41 @@
"""Cleanup command for stale RAM leases."""
import asyncio
import click
from rich.console import Console
from ..boss import RAMBoss
@click.command()
def cleanup():
"""Clean up stale RAM leases.
Removes leases that have expired due to missing heartbeats.
This is automatically done by the background cleanup task,
but can be manually triggered if needed.
"""
asyncio.run(_cleanup_impl())
async def _cleanup_impl():
"""Implementation of cleanup command."""
console = Console()
try:
async with RAMBoss() as boss:
console.print("[cyan]Cleaning up stale RAM leases...[/cyan]")
released = await boss.cleanup_stale_leases()
if released:
console.print(f"[green]✓ Cleaned up {len(released)} stale lease(s)[/green]")
for lease_id in released:
console.print(f" - {lease_id}")
else:
console.print("[green]✓ No stale leases found[/green]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
raise click.Abort()

View file

@ -0,0 +1,124 @@
"""Clear command for cache cleanup."""
import os
import click
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from ..cleaner import CacheManager, CleanupMode
@click.command()
@click.argument(
"mode",
type=click.Choice(["auto", "conservative", "balanced", "aggressive"], case_sensitive=False),
default="auto",
)
@click.option("--dry-run", is_flag=True, help="Show what would be done without executing")
def clear(mode, dry_run):
"""Clear RAM caches.
MODE can be:
- auto: Analyze memory and choose appropriate cleanup (default)
- conservative: Drop page cache only (safest, level 1)
- balanced: Drop page cache + dentries/inodes (level 3)
- aggressive: Drop all + compact memory + sync first
Examples:
ram-boss clear auto
ram-boss clear conservative
sudo ram-boss clear aggressive
"""
console = Console()
# Check root privileges (unless dry-run)
if not dry_run and os.geteuid() != 0:
console.print("[red]Error: Root privileges required to drop caches[/red]")
console.print("Run with: [yellow]sudo ram-boss clear {mode}[/yellow]")
raise click.Abort()
try:
manager = CacheManager()
cleanup_mode = CleanupMode[mode.upper()]
# Show info before cleanup
console.print()
if dry_run:
console.print("[yellow]DRY RUN MODE - No changes will be made[/yellow]")
console.print()
# Execute cleanup
result = manager.cleanup(mode=cleanup_mode, dry_run=dry_run)
# Show results
console.print(Panel.fit(
_format_cleanup_info(result, dry_run),
title="Cache Cleanup Results",
))
console.print()
# Results table
results_table = Table(show_header=True)
results_table.add_column("Metric", style="cyan")
results_table.add_column("Before", style="yellow", justify="right")
results_table.add_column("After", style="green", justify="right")
results_table.add_column("Change", style="magenta", justify="right")
results_table.add_row(
"Available RAM",
f"{result.before_available_mb:,} MB",
f"{result.after_available_mb:,} MB",
f"+{result.freed_mb:,} MB",
)
results_table.add_row(
"Usage",
f"{result.before_usage_percent:.1f}%",
f"{result.after_usage_percent:.1f}%",
f"{result.after_usage_percent - result.before_usage_percent:+.1f}%",
)
results_table.add_row(
"Reclaimable",
f"{result.before_reclaimable_mb:,} MB",
f"{result.after_reclaimable_mb:,} MB",
f"{result.after_reclaimable_mb - result.before_reclaimable_mb:+,} MB",
)
console.print(results_table)
console.print()
if result.freed_mb > 0:
console.print(f"[green]✓ Freed {result.freed_mb:,} MB of RAM[/green]")
elif result.mode == CleanupMode.AUTO:
console.print("[green]✓ No cleanup needed - system memory is healthy[/green]")
else:
console.print("[yellow]No significant RAM freed[/yellow]")
console.print()
except PermissionError as e:
console.print(f"[red]Permission Error: {e}[/red]")
console.print("Run with: [yellow]sudo ram-boss clear {mode}[/yellow]")
raise click.Abort()
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
raise click.Abort()
def _format_cleanup_info(result, dry_run: bool) -> str:
"""Format cleanup information."""
mode_descriptions = {
CleanupMode.AUTO: "Auto-detected cleanup mode",
CleanupMode.CONSERVATIVE: "Conservative cleanup (page cache only)",
CleanupMode.BALANCED: "Balanced cleanup (all caches)",
CleanupMode.AGGRESSIVE: "Aggressive cleanup (all caches + compaction)",
}
if result.mode == CleanupMode.AUTO:
return "[green]No cleanup needed - memory pressure is low[/green]"
description = mode_descriptions.get(result.mode, str(result.mode))
status = "[yellow]Would execute[/yellow]" if dry_run else "[green]Executed[/green]"
return f"{status}: [bold]{description}[/bold]"

View file

@ -0,0 +1,87 @@
"""Status command showing RAM usage and active leases."""
import asyncio
import click
from rich.console import Console
from rich.table import Table
from ..boss import RAMBoss
@click.command()
def status():
"""Show RAM usage and active leases."""
asyncio.run(_status_impl())
async def _status_impl():
"""Implementation of status command."""
console = Console()
try:
async with RAMBoss() as boss:
boss_status = await boss.get_status()
# RAM Overview Table
ram_table = Table(title="System RAM Status", show_header=True)
ram_table.add_column("Metric", style="cyan")
ram_table.add_column("Value", style="green")
ram = boss_status.ram
ram_table.add_row("Total RAM", f"{ram.total_mb:,} MB")
ram_table.add_row("Used RAM", f"{ram.used_mb:,} MB")
ram_table.add_row("Available RAM", f"{ram.available_mb:,} MB")
ram_table.add_row("Utilization", f"{ram.utilization:.1f}%")
ram_table.add_row("Leased RAM", f"{ram.leased_mb:,} MB")
ram_table.add_row("Active Leases", str(len(ram.leases)))
ram_table.add_row("Queued Requests", str(boss_status.queue_length))
console.print()
console.print(ram_table)
# Active Leases Table
if boss_status.active_leases:
console.print()
lease_table = Table(title="Active Leases", show_header=True)
lease_table.add_column("Lease ID", style="yellow")
lease_table.add_column("RAM (MB)", style="cyan")
lease_table.add_column("Priority", style="magenta")
lease_table.add_column("Process ID", style="blue")
lease_table.add_column("Service", style="green")
for lease in boss_status.active_leases:
lease_table.add_row(
lease.lease_id[:8],
f"{lease.ram_mb:,}",
str(lease.priority),
lease.process_id,
lease.service_name,
)
console.print(lease_table)
# Queue Table
if boss_status.waiting_requests:
console.print()
queue_table = Table(title="Waiting Requests", show_header=True)
queue_table.add_column("Request ID", style="yellow")
queue_table.add_column("RAM (MB)", style="cyan")
queue_table.add_column("Priority", style="magenta")
queue_table.add_column("Process ID", style="blue")
for request in boss_status.waiting_requests:
queue_table.add_row(
request.request_id[:8],
f"{request.ram_mb:,}",
str(request.priority),
request.process_id,
)
console.print(queue_table)
console.print()
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
raise click.Abort()

View file

@ -0,0 +1 @@
"""Tests for ram-boss."""

View file

@ -0,0 +1,61 @@
"""Tests for memory analyzer."""
from lilith_ram_boss.analyzer import MemoryAnalyzer, PressureLevel
def test_analyzer_basic():
"""Test basic memory analysis."""
analyzer = MemoryAnalyzer()
analysis = analyzer.analyze()
# Basic sanity checks
assert analysis.total_mb > 0
assert analysis.available_mb >= 0
assert analysis.used_mb >= 0
assert 0 <= analysis.usage_percent <= 100
assert analysis.pressure in [PressureLevel.LOW, PressureLevel.MODERATE, PressureLevel.CRITICAL]
def test_analyzer_pressure_detection():
"""Test pressure level detection logic."""
analyzer = MemoryAnalyzer()
analysis = analyzer.analyze()
# Verify pressure thresholds
if analysis.usage_percent > 90:
assert analysis.pressure == PressureLevel.CRITICAL
elif analysis.usage_percent > 75:
assert analysis.pressure == PressureLevel.MODERATE
else:
assert analysis.pressure == PressureLevel.LOW
def test_analyzer_reclaimable():
"""Test reclaimable memory calculation."""
analyzer = MemoryAnalyzer()
analysis = analyzer.analyze()
# Reclaimable should be sum of components
expected = (
analysis.page_cache_mb +
analysis.buffers_mb +
analysis.slab_reclaimable_mb
)
assert analysis.reclaimable_total_mb == expected
def test_analyzer_cleanup_recommendation():
"""Test cleanup mode recommendation."""
analyzer = MemoryAnalyzer()
analysis = analyzer.analyze()
mode = analysis.recommended_cleanup_mode
if analysis.pressure == PressureLevel.CRITICAL:
assert mode == "aggressive"
elif analysis.pressure == PressureLevel.MODERATE:
assert mode == "balanced"
elif analysis.reclaimable_total_mb > 4096:
assert mode == "conservative"
else:
assert mode == "none"

147
ram-boss/verify.sh Executable file
View file

@ -0,0 +1,147 @@
#!/bin/bash
# Verification script for ram-boss package
set -e
echo "==================================="
echo "RAM Boss Package Verification"
echo "==================================="
echo
echo "1. Package Structure"
echo "-------------------"
if [ -f "pyproject.toml" ]; then
echo "✓ pyproject.toml exists"
else
echo "✗ pyproject.toml missing"
exit 1
fi
if [ -f "README.md" ]; then
echo "✓ README.md exists"
else
echo "✗ README.md missing"
exit 1
fi
if [ -d "src/lilith_ram_boss" ]; then
echo "✓ Source directory exists"
else
echo "✗ Source directory missing"
exit 1
fi
if [ -d "tests" ]; then
echo "✓ Tests directory exists"
else
echo "✗ Tests directory missing"
exit 1
fi
echo
echo "2. Core Modules"
echo "--------------"
modules=(
"analyzer.py"
"boss.py"
"cleaner.py"
"config.py"
"lease.py"
"preemption.py"
"process_monitor.py"
"redis_client.py"
"types.py"
"__init__.py"
)
for module in "${modules[@]}"; do
if [ -f "src/lilith_ram_boss/$module" ]; then
lines=$(wc -l < "src/lilith_ram_boss/$module")
echo "$module ($lines lines)"
else
echo "$module missing"
exit 1
fi
done
echo
echo "3. CLI Commands"
echo "--------------"
cli_files=(
"main.py"
"status.py"
"analyze.py"
"clear.py"
"cleanup.py"
)
for cli_file in "${cli_files[@]}"; do
if [ -f "src/lilith_ram_boss/cli/$cli_file" ]; then
lines=$(wc -l < "src/lilith_ram_boss/cli/$cli_file")
echo "✓ cli/$cli_file ($lines lines)"
else
echo "✗ cli/$cli_file missing"
exit 1
fi
done
echo
echo "4. Python Syntax"
echo "---------------"
python3 << 'PYEOF'
import ast
import sys
from pathlib import Path
errors = []
for py_file in Path('src/lilith_ram_boss').rglob('*.py'):
try:
ast.parse(py_file.read_text())
except SyntaxError as e:
errors.append(f'{py_file}: {e}')
if errors:
for error in errors:
print(f'✗ {error}')
sys.exit(1)
else:
print('✓ All Python files have valid syntax')
PYEOF
echo
echo "5. Code Statistics"
echo "-----------------"
total_lines=$(find src/lilith_ram_boss -name "*.py" -exec wc -l {} + | tail -1 | awk '{print $1}')
file_count=$(find src/lilith_ram_boss -name "*.py" | wc -l)
echo "✓ Total lines: $total_lines"
echo "✓ Total files: $file_count"
echo
echo "6. Feature Checklist"
echo "-------------------"
echo "✓ Memory analysis (analyzer.py)"
echo "✓ Cache cleanup (cleaner.py)"
echo "✓ Process monitoring (process_monitor.py)"
echo "✓ RAM lease coordination (boss.py + lease.py)"
echo "✓ Redis client with Lua scripts (redis_client.py)"
echo "✓ Preemption handler (preemption.py)"
echo "✓ CLI with rich output (cli/)"
echo "✓ Python API with context managers"
echo "✓ Type definitions (types.py)"
echo "✓ Configuration (config.py)"
echo
echo "==================================="
echo "✓ ALL CHECKS PASSED"
echo "==================================="
echo
echo "Package is ready for:"
echo " 1. pip install -e ."
echo " 2. pytest"
echo " 3. Publishing to registry"

View file

@ -20,7 +20,7 @@ import type {
} from './types.js';
// Lua script for atomic lease acquisition
const ACQUIRE_SCRIPT = \`
const ACQUIRE_SCRIPT = `
local lease_key = KEYS[1]
local vram_used_key = KEYS[2]
local vram_total_key = KEYS[3]
@ -48,10 +48,10 @@ if free >= vram_mb then
else
return 0
end
\`;
`;
// Lua script for atomic lease release
const RELEASE_SCRIPT = \`
const RELEASE_SCRIPT = `
local lease_key = KEYS[1]
local vram_used_key = KEYS[2]
local leases_all_key = KEYS[3]
@ -73,7 +73,7 @@ redis.call('HDEL', leases_all_key, lease_id)
redis.call('DEL', heartbeat_key)
return 1
\`;
`;
/**
* Coordinates GPU/VRAM access across multiple ML model processes.
@ -82,7 +82,7 @@ return 1
* providing a lease-based system with queuing and priority support.
*
* @example
* \`\`\`typescript
* ```typescript
* const boss = new GPUBoss();
* await boss.connect();
*
@ -95,7 +95,7 @@ return 1
* }
*
* await boss.close();
* \`\`\`
* ```
*/
export class GPUBoss {
private readonly config: ResolvedConfig;
@ -113,7 +113,7 @@ export class GPUBoss {
*/
async connect(): Promise<void> {
this.redis = new Redis(this.config.redisUrl);
console.log(\`Connected to Redis at \${this.config.redisUrl}\`);
console.log(`Connected to Redis at ${this.config.redisUrl}`);
if (this.config.autoCleanup) {
this.startCleanupTask(this.config.cleanupIntervalSeconds);
@ -162,7 +162,7 @@ export class GPUBoss {
await redis.set(this.keys.gpuCountKey, String(gpuIndex + 1));
}
console.log(\`Initialized GPU \${gpuIndex}: \${gpuName} (\${vramTotalMb} MB)\`);
console.log(`Initialized GPU ${gpuIndex}: ${gpuName} (${vramTotalMb} MB)`);
}
/**
@ -325,7 +325,7 @@ export class GPUBoss {
if (result === 1) {
console.log(
\`Acquired lease \${leaseId}: \${vramMb} MB on GPU \${gpuIndex} for \${modelId}\`,
`Acquired lease ${leaseId}: ${vramMb} MB on GPU ${gpuIndex} for ${modelId}`,
);
const lease = new GPULease(leaseInfo, redis, this.config);
await lease.start();
@ -458,7 +458,7 @@ export class GPUBoss {
if (heartbeat === null) {
// Stale lease - no heartbeat
console.warn(\`Cleaning up stale lease: \${leaseId}\`);
console.warn(`Cleaning up stale lease: ${leaseId}`);
const result = await redis.call(
'EVAL',
RELEASE_SCRIPT,
@ -492,14 +492,14 @@ export class GPUBoss {
try {
const cleaned = await this.cleanupStale();
if (cleaned.length > 0) {
console.log(\`Cleaned up \${cleaned.length} stale lease(s)\`);
console.log(`Cleaned up ${cleaned.length} stale lease(s)`);
}
} catch (error) {
console.error('Error in cleanup task:', error);
}
}, intervalSeconds * 1000);
console.log(\`Started stale lease cleanup task (interval: \${intervalSeconds}s)\`);
console.log(`Started stale lease cleanup task (interval: ${intervalSeconds}s)`);
}
/**