AI Scientist with Docker & Ollama: Building Automated Research Pipelines
⏱️ Estimated Reading Time: 18 minutes
Introduction
Imagine having a tireless AI researcher working 24/7, generating papers, conducting experiments, and pushing the boundaries of scientific discovery while you sleep. With SakanaAI’s AI Scientist and local LLM infrastructure, this is no longer science fiction—it’s a practical reality you can implement today.
This comprehensive guide will walk you through setting up an automated research pipeline using:
- SakanaAI’s AI Scientist: The world’s first system for fully automated scientific discovery
- OrbStack Docker: Lightweight containerization for seamless deployment
- Ollama & LM Studio: Local LLM inference for cost-effective and private research
- Queue Management: Continuous operation with intelligent task scheduling
By the end of this tutorial, you’ll have a robust, self-sustaining research environment capable of generating scientific papers across multiple domains without constant human intervention.
Understanding AI Scientist
What Makes AI Scientist Revolutionary?
SakanaAI’s AI Scientist represents a paradigm shift in automated research. Unlike traditional AI tools that assist researchers, this system conducts entire research projects autonomously:
- End-to-End Automation: From idea generation to paper writing and peer review
- Multi-Template Support: NanoGPT, 2D Diffusion, and Grokking research domains
- Automated Experimentation: Designs, executes, and analyzes experiments
- LaTeX Paper Generation: Produces publication-ready academic papers
- Peer Review System: Built-in review mechanisms for quality assessment
System Architecture Overview
graph TD
A["🎯 Research Idea<br/>Generation"] --> B["🔬 Experiment<br/>Design"]
B --> C["⚙️ Code<br/>Implementation"]
C --> D["🧪 Experiment<br/>Execution"]
D --> E["📊 Results<br/>Analysis"]
E --> F["📝 Paper<br/>Writing"]
F --> G["📋 Peer<br/>Review"]
G --> H["📄 Final<br/>Paper"]
style A fill:#e3f2fd
style B fill:#f3e5f5
style C fill:#e8f5e8
style D fill:#fff3e0
style E fill:#ffebee
style F fill:#f1f8e9
style G fill:#fce4ec
style H fill:#e0f2f1
Prerequisites and Environment Setup
System Requirements
# Minimum Hardware Requirements
- RAM: 16GB (32GB recommended for larger models)
- Storage: 50GB free space
- CPU: 8+ cores (Apple Silicon or x86_64)
- GPU: Optional but recommended (NVIDIA RTX 3080+ or Apple M-series)
# Software Dependencies
- macOS 13+ or Linux Ubuntu 20.04+
- OrbStack or Docker Desktop
- Python 3.8+
- Git
Installing OrbStack
OrbStack provides superior performance compared to Docker Desktop, especially on macOS:
# Install OrbStack via Homebrew
brew install orbstack
# Start OrbStack service
orbstack start
# Verify installation
orbstack --version
Setting Up Ollama
Ollama provides an excellent local LLM inference solution:
# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh
# Start Ollama service
ollama serve
# Download recommended models for research
ollama pull llama2:70b # Large context model
ollama pull codellama:34b # Code generation
ollama pull mistral:7b # Fast inference
ollama pull deepseek-coder:33b # Advanced coding
# Verify installation
ollama list
LM Studio Alternative Setup
For GUI-based model management:
# Download LM Studio from https://lmstudio.ai/
# Install and configure API server
# Default API endpoint: http://localhost:1234/v1
AI Scientist Installation & Configuration
Clone and Setup Repository
# Clone AI Scientist repository
git clone https://github.com/SakanaAI/AI-Scientist.git
cd AI-Scientist
# Create dedicated directory for our setup
mkdir -p ~/ai-research-lab
cd ~/ai-research-lab
# Copy AI Scientist files
cp -r /path/to/AI-Scientist/* .
Docker Environment Configuration
Create a comprehensive Docker setup:
# Dockerfile for AI Scientist with local LLM support
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
git \
wget \
curl \
build-essential \
texlive-full \
pandoc \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Install additional packages for enhanced functionality
RUN pip install \
ollama \
openai \
anthropic \
tiktoken \
matplotlib \
seaborn \
jupyter \
notebook
# Copy AI Scientist code
COPY . .
# Create necessary directories
RUN mkdir -p /app/results /app/logs /app/queue
# Set environment variables
ENV PYTHONPATH=/app
ENV OLLAMA_HOST=host.docker.internal:11434
ENV LM_STUDIO_BASE_URL=http://host.docker.internal:1234/v1
# Expose ports for Jupyter and monitoring
EXPOSE 8888 8080
# Create startup script
COPY scripts/startup.sh /startup.sh
RUN chmod +x /startup.sh
CMD ["/startup.sh"]
Docker Compose for Complete Stack
# docker-compose.yml
version: '3.8'
services:
ai-scientist:
build: .
container_name: ai-scientist-main
volumes:
- ./results:/app/results
- ./logs:/app/logs
- ./queue:/app/queue
- ./templates:/app/templates
ports:
- "8888:8888" # Jupyter
- "8080:8080" # Monitoring dashboard
environment:
- OLLAMA_HOST=host.docker.internal:11434
- LM_STUDIO_BASE_URL=http://host.docker.internal:1234/v1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
depends_on:
- redis
networks:
- ai-research-net
redis:
image: redis:7-alpine
container_name: ai-scientist-redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- ai-research-net
queue-manager:
build: .
container_name: ai-scientist-queue
command: python scripts/queue_manager.py
volumes:
- ./queue:/app/queue
- ./logs:/app/logs
depends_on:
- redis
- ai-scientist
networks:
- ai-research-net
monitoring:
build: .
container_name: ai-scientist-monitor
command: python scripts/monitoring_dashboard.py
ports:
- "8081:8081"
volumes:
- ./logs:/app/logs
- ./results:/app/results
networks:
- ai-research-net
volumes:
redis_data:
networks:
ai-research-net:
driver: bridge
Local LLM Integration
Ollama API Integration
Create a custom LLM client for Ollama:
# scripts/ollama_client.py
import requests
import json
from typing import Dict, List, Optional
import logging
class OllamaClient:
def __init__(self, base_url: str = "http://localhost:11434"):
self.base_url = base_url
self.logger = logging.getLogger(__name__)
def generate(self,
model: str,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 4000,
**kwargs) -> str:
"""Generate text using Ollama API"""
try:
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
**kwargs
}
}
response = requests.post(
f"{self.base_url}/api/generate",
json=payload,
timeout=300
)
response.raise_for_status()
result = response.json()
return result.get("response", "")
except Exception as e:
self.logger.error(f"Ollama generation error: {e}")
raise
def list_models(self) -> List[str]:
"""List available models"""
try:
response = requests.get(f"{self.base_url}/api/tags")
response.raise_for_status()
models = response.json().get("models", [])
return [model["name"] for model in models]
except Exception as e:
self.logger.error(f"Error listing models: {e}")
return []
def chat_completion(self,
model: str,
messages: List[Dict],
temperature: float = 0.7,
max_tokens: int = 4000) -> str:
"""OpenAI-compatible chat completion"""
try:
# Convert messages to single prompt
prompt = self._messages_to_prompt(messages)
return self.generate(model, prompt, temperature, max_tokens)
except Exception as e:
self.logger.error(f"Chat completion error: {e}")
raise
def _messages_to_prompt(self, messages: List[Dict]) -> str:
"""Convert OpenAI message format to prompt"""
prompt_parts = []
for message in messages:
role = message.get("role", "user")
content = message.get("content", "")
if role == "system":
prompt_parts.append(f"System: {content}")
elif role == "user":
prompt_parts.append(f"Human: {content}")
elif role == "assistant":
prompt_parts.append(f"Assistant: {content}")
prompt_parts.append("Assistant:")
return "\n\n".join(prompt_parts)
# Test the client
if __name__ == "__main__":
client = OllamaClient()
print("Available models:", client.list_models())
test_response = client.generate(
model="llama2:7b",
prompt="Explain quantum computing in simple terms."
)
print("Test response:", test_response[:200] + "...")
LM Studio Integration
# scripts/lm_studio_client.py
import openai
from typing import Dict, List
import logging
class LMStudioClient:
def __init__(self, base_url: str = "http://localhost:1234/v1"):
self.client = openai.OpenAI(
base_url=base_url,
api_key="lm-studio" # Required but ignored
)
self.logger = logging.getLogger(__name__)
def generate(self,
model: str,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 4000,
**kwargs) -> str:
"""Generate text using LM Studio API"""
try:
messages = [{"role": "user", "content": prompt}]
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
self.logger.error(f"LM Studio generation error: {e}")
raise
def chat_completion(self,
model: str,
messages: List[Dict],
temperature: float = 0.7,
max_tokens: int = 4000) -> str:
"""Direct chat completion"""
try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
self.logger.error(f"Chat completion error: {e}")
raise
def list_models(self) -> List[str]:
"""List available models"""
try:
models = self.client.models.list()
return [model.id for model in models.data]
except Exception as e:
self.logger.error(f"Error listing models: {e}")
return []
# Test the client
if __name__ == "__main__":
client = LMStudioClient()
print("Available models:", client.list_models())
test_response = client.generate(
model="local-model",
prompt="Explain machine learning in simple terms."
)
print("Test response:", test_response[:200] + "...")
Queue Management System
Redis-Based Queue Implementation
# scripts/queue_manager.py
import redis
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ResearchTask:
id: str
template: str
model: str
num_ideas: int
priority: int = 1
created_at: datetime = None
started_at: datetime = None
completed_at: datetime = None
status: TaskStatus = TaskStatus.PENDING
progress: int = 0
error_message: str = ""
results_path: str = ""
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
class QueueManager:
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.logger = logging.getLogger(__name__)
# Queue keys
self.pending_queue = "ai_scientist:pending"
self.running_queue = "ai_scientist:running"
self.completed_queue = "ai_scientist:completed"
self.failed_queue = "ai_scientist:failed"
self.task_data = "ai_scientist:tasks"
def add_task(self, task: ResearchTask) -> str:
"""Add a new research task to the queue"""
try:
# Store task data
task_json = json.dumps(asdict(task), default=str)
self.redis_client.hset(self.task_data, task.id, task_json)
# Add to pending queue with priority
self.redis_client.zadd(
self.pending_queue,
{task.id: task.priority}
)
self.logger.info(f"Task {task.id} added to queue")
return task.id
except Exception as e:
self.logger.error(f"Error adding task: {e}")
raise
def get_next_task(self) -> Optional[ResearchTask]:
"""Get the next highest priority task"""
try:
# Get highest priority task
task_ids = self.redis_client.zrevrange(
self.pending_queue, 0, 0
)
if not task_ids:
return None
task_id = task_ids[0]
# Move to running queue
self.redis_client.zrem(self.pending_queue, task_id)
self.redis_client.sadd(self.running_queue, task_id)
# Get task data
task_data = self.redis_client.hget(self.task_data, task_id)
if not task_data:
return None
task_dict = json.loads(task_data)
task = ResearchTask(**task_dict)
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
# Update task
self.update_task(task)
return task
except Exception as e:
self.logger.error(f"Error getting next task: {e}")
return None
def update_task(self, task: ResearchTask):
"""Update task status and data"""
try:
task_json = json.dumps(asdict(task), default=str)
self.redis_client.hset(self.task_data, task.id, task_json)
except Exception as e:
self.logger.error(f"Error updating task: {e}")
def complete_task(self, task_id: str, results_path: str = ""):
"""Mark task as completed"""
try:
task = self.get_task(task_id)
if not task:
return
# Move to completed queue
self.redis_client.srem(self.running_queue, task_id)
self.redis_client.sadd(self.completed_queue, task_id)
# Update task
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.progress = 100
task.results_path = results_path
self.update_task(task)
self.logger.info(f"Task {task_id} completed")
except Exception as e:
self.logger.error(f"Error completing task: {e}")
def fail_task(self, task_id: str, error_message: str = ""):
"""Mark task as failed"""
try:
task = self.get_task(task_id)
if not task:
return
# Move to failed queue
self.redis_client.srem(self.running_queue, task_id)
self.redis_client.sadd(self.failed_queue, task_id)
# Update task
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
task.error_message = error_message
self.update_task(task)
self.logger.error(f"Task {task_id} failed: {error_message}")
except Exception as e:
self.logger.error(f"Error failing task: {e}")
def get_task(self, task_id: str) -> Optional[ResearchTask]:
"""Get task by ID"""
try:
task_data = self.redis_client.hget(self.task_data, task_id)
if not task_data:
return None
task_dict = json.loads(task_data)
return ResearchTask(**task_dict)
except Exception as e:
self.logger.error(f"Error getting task: {e}")
return None
def get_queue_stats(self) -> Dict[str, int]:
"""Get queue statistics"""
try:
return {
"pending": self.redis_client.zcard(self.pending_queue),
"running": self.redis_client.scard(self.running_queue),
"completed": self.redis_client.scard(self.completed_queue),
"failed": self.redis_client.scard(self.failed_queue)
}
except Exception as e:
self.logger.error(f"Error getting queue stats: {e}")
return {}
def list_tasks(self, status: TaskStatus = None) -> List[ResearchTask]:
"""List tasks by status"""
try:
if status == TaskStatus.PENDING:
task_ids = self.redis_client.zrevrange(self.pending_queue, 0, -1)
elif status == TaskStatus.RUNNING:
task_ids = list(self.redis_client.smembers(self.running_queue))
elif status == TaskStatus.COMPLETED:
task_ids = list(self.redis_client.smembers(self.completed_queue))
elif status == TaskStatus.FAILED:
task_ids = list(self.redis_client.smembers(self.failed_queue))
else:
# Get all tasks
task_ids = list(self.redis_client.hkeys(self.task_data))
tasks = []
for task_id in task_ids:
task = self.get_task(task_id)
if task:
tasks.append(task)
return tasks
except Exception as e:
self.logger.error(f"Error listing tasks: {e}")
return []
# Worker process
class ResearchWorker:
def __init__(self, queue_manager: QueueManager):
self.queue_manager = queue_manager
self.logger = logging.getLogger(__name__)
self.running = False
def start(self):
"""Start the worker process"""
self.running = True
self.logger.info("Research worker started")
while self.running:
try:
task = self.queue_manager.get_next_task()
if task:
self.logger.info(f"Processing task: {task.id}")
self.process_task(task)
else:
# No tasks available, wait
time.sleep(10)
except KeyboardInterrupt:
self.logger.info("Worker interrupted")
break
except Exception as e:
self.logger.error(f"Worker error: {e}")
time.sleep(30)
def process_task(self, task: ResearchTask):
"""Process a research task"""
try:
# Import AI Scientist modules
import subprocess
import os
# Prepare command
cmd = [
"python", "launch_scientist.py",
"--model", task.model,
"--experiment", task.template,
"--num-ideas", str(task.num_ideas),
"--out-dir", f"results/{task.id}"
]
# Update progress
task.progress = 10
self.queue_manager.update_task(task)
# Execute AI Scientist
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=3600 # 1 hour timeout
)
if result.returncode == 0:
# Success
results_path = f"results/{task.id}"
self.queue_manager.complete_task(task.id, results_path)
self.logger.info(f"Task {task.id} completed successfully")
else:
# Failure
error_msg = result.stderr or "Unknown error"
self.queue_manager.fail_task(task.id, error_msg)
self.logger.error(f"Task {task.id} failed: {error_msg}")
except subprocess.TimeoutExpired:
self.queue_manager.fail_task(task.id, "Task timeout")
except Exception as e:
self.queue_manager.fail_task(task.id, str(e))
def stop(self):
"""Stop the worker process"""
self.running = False
self.logger.info("Research worker stopped")
# Main execution
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Initialize queue manager
queue_manager = QueueManager()
# Create and start worker
worker = ResearchWorker(queue_manager)
try:
worker.start()
except KeyboardInterrupt:
worker.stop()
Task Submission Interface
# scripts/task_submitter.py
import uuid
from datetime import datetime
from queue_manager import QueueManager, ResearchTask, TaskStatus
class TaskSubmitter:
def __init__(self):
self.queue_manager = QueueManager()
def submit_research_batch(self,
templates: list,
models: list,
ideas_per_task: int = 2,
priority: int = 1) -> list:
"""Submit a batch of research tasks"""
task_ids = []
for template in templates:
for model in models:
task_id = str(uuid.uuid4())
task = ResearchTask(
id=task_id,
template=template,
model=model,
num_ideas=ideas_per_task,
priority=priority
)
self.queue_manager.add_task(task)
task_ids.append(task_id)
return task_ids
def submit_overnight_research(self) -> list:
"""Submit a comprehensive overnight research batch"""
templates = ["nanoGPT_lite", "2d_diffusion", "grokking"]
models = [
"llama2:70b",
"codellama:34b",
"mistral:7b",
"deepseek-coder:33b"
]
return self.submit_research_batch(
templates=templates,
models=models,
ideas_per_task=3,
priority=2
)
def submit_continuous_research(self,
hours: int = 24) -> list:
"""Submit continuous research for specified hours"""
# Calculate number of tasks based on duration
estimated_time_per_task = 2 # hours
num_batches = max(1, hours // estimated_time_per_task)
all_task_ids = []
for batch in range(num_batches):
task_ids = self.submit_overnight_research()
all_task_ids.extend(task_ids)
print(f"Batch {batch + 1} submitted: {len(task_ids)} tasks")
return all_task_ids
# CLI interface
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Submit AI Scientist research tasks")
parser.add_argument("--template", type=str, help="Research template")
parser.add_argument("--model", type=str, help="LLM model")
parser.add_argument("--ideas", type=int, default=2, help="Number of ideas")
parser.add_argument("--priority", type=int, default=1, help="Task priority")
parser.add_argument("--batch", action="store_true", help="Submit overnight batch")
parser.add_argument("--continuous", type=int, help="Continuous research hours")
args = parser.parse_args()
submitter = TaskSubmitter()
if args.batch:
task_ids = submitter.submit_overnight_research()
print(f"Submitted overnight batch: {len(task_ids)} tasks")
elif args.continuous:
task_ids = submitter.submit_continuous_research(args.continuous)
print(f"Submitted {args.continuous}h continuous research: {len(task_ids)} tasks")
elif args.template and args.model:
task_id = str(uuid.uuid4())
task = ResearchTask(
id=task_id,
template=args.template,
model=args.model,
num_ideas=args.ideas,
priority=args.priority
)
submitter.queue_manager.add_task(task)
print(f"Submitted single task: {task_id}")
else:
print("Please specify --batch, --continuous, or both --template and --model")
Monitoring and Management
Real-time Dashboard
# scripts/monitoring_dashboard.py
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import time
from datetime import datetime, timedelta
from queue_manager import QueueManager, TaskStatus
st.set_page_config(
page_title="AI Scientist Dashboard",
page_icon="🧑🔬",
layout="wide"
)
class MonitoringDashboard:
def __init__(self):
self.queue_manager = QueueManager()
def render_header(self):
"""Render dashboard header"""
st.title("🧑🔬 AI Scientist Research Dashboard")
st.markdown("Real-time monitoring of automated research pipeline")
# Refresh button
if st.button("🔄 Refresh", key="refresh"):
st.rerun()
def render_queue_stats(self):
"""Render queue statistics"""
stats = self.queue_manager.get_queue_stats()
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("⏳ Pending", stats.get("pending", 0))
with col2:
st.metric("🔄 Running", stats.get("running", 0))
with col3:
st.metric("✅ Completed", stats.get("completed", 0))
with col4:
st.metric("❌ Failed", stats.get("failed", 0))
def render_task_timeline(self):
"""Render task timeline chart"""
st.subheader("📊 Task Timeline")
# Get all tasks
all_tasks = self.queue_manager.list_tasks()
if not all_tasks:
st.info("No tasks found")
return
# Prepare data for timeline
timeline_data = []
for task in all_tasks:
timeline_data.append({
"Task ID": task.id[:8],
"Template": task.template,
"Model": task.model,
"Status": task.status.value,
"Created": task.created_at,
"Started": task.started_at,
"Completed": task.completed_at,
"Duration": self._calculate_duration(task)
})
df = pd.DataFrame(timeline_data)
# Status distribution pie chart
col1, col2 = st.columns(2)
with col1:
status_counts = df["Status"].value_counts()
fig_pie = px.pie(
values=status_counts.values,
names=status_counts.index,
title="Task Status Distribution"
)
st.plotly_chart(fig_pie, use_container_width=True)
with col2:
# Duration histogram
completed_tasks = df[df["Status"] == "completed"]
if not completed_tasks.empty:
fig_hist = px.histogram(
completed_tasks,
x="Duration",
title="Task Duration Distribution (minutes)",
nbins=20
)
st.plotly_chart(fig_hist, use_container_width=True)
else:
st.info("No completed tasks yet")
def render_task_list(self):
"""Render detailed task list"""
st.subheader("📋 Task Details")
# Status filter
status_filter = st.selectbox(
"Filter by Status",
["All", "pending", "running", "completed", "failed"]
)
# Get filtered tasks
if status_filter == "All":
tasks = self.queue_manager.list_tasks()
else:
tasks = self.queue_manager.list_tasks(TaskStatus(status_filter))
if not tasks:
st.info(f"No {status_filter} tasks found")
return
# Create task table
task_data = []
for task in tasks:
task_data.append({
"ID": task.id[:8] + "...",
"Template": task.template,
"Model": task.model,
"Ideas": task.num_ideas,
"Status": task.status.value.title(),
"Progress": f"{task.progress}%",
"Created": task.created_at.strftime("%Y-%m-%d %H:%M") if task.created_at else "N/A",
"Duration": self._calculate_duration(task),
"Error": task.error_message[:50] + "..." if len(task.error_message) > 50 else task.error_message
})
df = pd.DataFrame(task_data)
st.dataframe(df, use_container_width=True)
def render_resource_usage(self):
"""Render resource usage metrics"""
st.subheader("💻 Resource Usage")
# This would ideally connect to actual system metrics
# For demo purposes, we'll show placeholder data
col1, col2, col3 = st.columns(3)
with col1:
# CPU usage simulation
cpu_usage = 65 # This would come from actual monitoring
fig_cpu = go.Figure(go.Indicator(
mode="gauge+number",
value=cpu_usage,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "CPU Usage %"},
gauge={'axis': {'range': [None, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 50], 'color': "lightgray"},
{'range': [50, 80], 'color': "yellow"},
{'range': [80, 100], 'color': "red"}
]}
))
fig_cpu.update_layout(height=250)
st.plotly_chart(fig_cpu, use_container_width=True)
with col2:
# Memory usage simulation
mem_usage = 78
fig_mem = go.Figure(go.Indicator(
mode="gauge+number",
value=mem_usage,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "Memory Usage %"},
gauge={'axis': {'range': [None, 100]},
'bar': {'color': "darkgreen"},
'steps': [
{'range': [0, 50], 'color': "lightgray"},
{'range': [50, 80], 'color': "yellow"},
{'range': [80, 100], 'color': "red"}
]}
))
fig_mem.update_layout(height=250)
st.plotly_chart(fig_mem, use_container_width=True)
with col3:
# GPU usage simulation (if available)
gpu_usage = 45
fig_gpu = go.Figure(go.Indicator(
mode="gauge+number",
value=gpu_usage,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "GPU Usage %"},
gauge={'axis': {'range': [None, 100]},
'bar': {'color': "darkred"},
'steps': [
{'range': [0, 50], 'color': "lightgray"},
{'range': [50, 80], 'color': "yellow"},
{'range': [80, 100], 'color': "red"}
]}
))
fig_gpu.update_layout(height=250)
st.plotly_chart(fig_gpu, use_container_width=True)
def render_logs(self):
"""Render recent logs"""
st.subheader("📜 Recent Logs")
# This would read from actual log files
# For demo purposes, showing placeholder
log_entries = [
"2025-09-02 14:30:15 - INFO - Task 12345678 started processing",
"2025-09-02 14:28:42 - INFO - Ollama model llama2:70b loaded successfully",
"2025-09-02 14:25:10 - INFO - New task added to queue: nanoGPT_lite",
"2025-09-02 14:22:33 - INFO - Task 87654321 completed successfully",
"2025-09-02 14:20:15 - ERROR - Task 11111111 failed: Connection timeout"
]
for entry in log_entries:
level = "INFO" if "INFO" in entry else "ERROR" if "ERROR" in entry else "WARNING"
if level == "INFO":
st.info(entry)
elif level == "ERROR":
st.error(entry)
else:
st.warning(entry)
def _calculate_duration(self, task) -> str:
"""Calculate task duration"""
if task.completed_at and task.started_at:
duration = task.completed_at - task.started_at
return f"{duration.total_seconds() / 60:.1f} min"
elif task.started_at:
duration = datetime.now() - task.started_at
return f"{duration.total_seconds() / 60:.1f} min (ongoing)"
else:
return "Not started"
def run(self):
"""Run the dashboard"""
self.render_header()
# Auto-refresh every 30 seconds
if "last_refresh" not in st.session_state:
st.session_state.last_refresh = time.time()
if time.time() - st.session_state.last_refresh > 30:
st.session_state.last_refresh = time.time()
st.rerun()
# Main content
self.render_queue_stats()
st.divider()
self.render_task_timeline()
st.divider()
self.render_task_list()
st.divider()
self.render_resource_usage()
st.divider()
self.render_logs()
# Run dashboard
if __name__ == "__main__":
dashboard = MonitoringDashboard()
dashboard.run()
Advanced Automation Scripts
Intelligent Task Scheduler
# scripts/intelligent_scheduler.py
import schedule
import time
import logging
from datetime import datetime, timedelta
from task_submitter import TaskSubmitter
from queue_manager import QueueManager, TaskStatus
import random
class IntelligentScheduler:
def __init__(self):
self.submitter = TaskSubmitter()
self.queue_manager = QueueManager()
self.logger = logging.getLogger(__name__)
# Configuration
self.max_concurrent_tasks = 4
self.min_queue_size = 2
self.templates = ["nanoGPT_lite", "2d_diffusion", "grokking"]
self.models = {
"fast": ["mistral:7b", "llama2:13b"],
"balanced": ["codellama:34b", "llama2:70b"],
"powerful": ["deepseek-coder:33b"]
}
def should_add_tasks(self) -> bool:
"""Determine if new tasks should be added"""
stats = self.queue_manager.get_queue_stats()
total_active = stats.get("pending", 0) + stats.get("running", 0)
# Add tasks if queue is getting low
return total_active < self.min_queue_size
def get_optimal_model_mix(self) -> list:
"""Get optimal mix of models based on time and resources"""
current_hour = datetime.now().hour
# Nighttime: Use powerful models
if 22 <= current_hour or current_hour <= 6:
return self.models["powerful"] + self.models["balanced"]
# Business hours: Use faster models
elif 9 <= current_hour <= 17:
return self.models["fast"]
# Evening: Balanced approach
else:
return self.models["balanced"]
def add_adaptive_tasks(self):
"""Add tasks based on current conditions"""
if not self.should_add_tasks():
self.logger.info("Queue has sufficient tasks, skipping addition")
return
# Get optimal models for current time
optimal_models = self.get_optimal_model_mix()
# Randomly select templates to ensure variety
selected_templates = random.sample(
self.templates,
min(2, len(self.templates))
)
# Submit tasks
task_ids = self.submitter.submit_research_batch(
templates=selected_templates,
models=optimal_models[:2], # Limit to 2 models
ideas_per_task=random.randint(2, 4),
priority=self._calculate_priority()
)
self.logger.info(f"Added {len(task_ids)} adaptive tasks")
def _calculate_priority(self) -> int:
"""Calculate task priority based on current conditions"""
current_hour = datetime.now().hour
# Higher priority during night hours
if 22 <= current_hour or current_hour <= 6:
return 3
elif 18 <= current_hour <= 21:
return 2
else:
return 1
def cleanup_old_tasks(self):
"""Clean up old completed/failed tasks"""
try:
cutoff_date = datetime.now() - timedelta(days=7)
# Get old completed tasks
completed_tasks = self.queue_manager.list_tasks(TaskStatus.COMPLETED)
old_completed = [
task for task in completed_tasks
if task.completed_at and task.completed_at < cutoff_date
]
# Get old failed tasks
failed_tasks = self.queue_manager.list_tasks(TaskStatus.FAILED)
old_failed = [
task for task in failed_tasks
if task.completed_at and task.completed_at < cutoff_date
]
# Remove old tasks (implementation depends on Redis setup)
total_cleaned = len(old_completed) + len(old_failed)
if total_cleaned > 0:
self.logger.info(f"Cleaned up {total_cleaned} old tasks")
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")
def health_check(self):
"""Perform system health check"""
try:
stats = self.queue_manager.get_queue_stats()
# Check for stuck running tasks
running_tasks = self.queue_manager.list_tasks(TaskStatus.RUNNING)
stuck_tasks = []
for task in running_tasks:
if task.started_at:
runtime = datetime.now() - task.started_at
if runtime > timedelta(hours=3): # 3 hour timeout
stuck_tasks.append(task)
if stuck_tasks:
self.logger.warning(f"Found {len(stuck_tasks)} potentially stuck tasks")
# Here you could implement automatic task recovery
# Log overall status
self.logger.info(f"Health check: {stats}")
except Exception as e:
self.logger.error(f"Health check failed: {e}")
def setup_schedule(self):
"""Setup the scheduling rules"""
# Add tasks every 30 minutes
schedule.every(30).minutes.do(self.add_adaptive_tasks)
# Health check every hour
schedule.every().hour.do(self.health_check)
# Daily cleanup at 3 AM
schedule.every().day.at("03:00").do(self.cleanup_old_tasks)
# Heavy batch submission for weekends
schedule.every().friday.at("22:00").do(self._weekend_batch)
self.logger.info("Scheduler setup complete")
def _weekend_batch(self):
"""Submit large batch for weekend processing"""
task_ids = self.submitter.submit_continuous_research(48) # 48 hours
self.logger.info(f"Weekend batch submitted: {len(task_ids)} tasks")
def run(self):
"""Run the scheduler"""
self.setup_schedule()
self.logger.info("Intelligent scheduler started")
while True:
try:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
self.logger.info("Scheduler stopped by user")
break
except Exception as e:
self.logger.error(f"Scheduler error: {e}")
time.sleep(300) # Wait 5 minutes before retrying
# Configuration and startup
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/scheduler.log'),
logging.StreamHandler()
]
)
scheduler = IntelligentScheduler()
try:
scheduler.run()
except KeyboardInterrupt:
print("Scheduler stopped")
Resource Monitoring Script
# scripts/resource_monitor.py
import psutil
import GPUtil
import time
import logging
import json
from datetime import datetime
from typing import Dict, List
import redis
class ResourceMonitor:
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.logger = logging.getLogger(__name__)
# Monitoring settings
self.monitoring_interval = 30 # seconds
self.history_retention = 24 * 60 * 60 # 24 hours in seconds
# Alert thresholds
self.cpu_threshold = 85
self.memory_threshold = 90
self.disk_threshold = 95
self.gpu_threshold = 95
def get_system_metrics(self) -> Dict:
"""Collect comprehensive system metrics"""
timestamp = datetime.now().isoformat()
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
cpu_freq = psutil.cpu_freq()
# Memory metrics
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
# Disk metrics
disk = psutil.disk_usage('/')
# Network metrics
network = psutil.net_io_counters()
# Process metrics
processes = len(psutil.pids())
# GPU metrics (if available)
gpu_metrics = self.get_gpu_metrics()
return {
"timestamp": timestamp,
"cpu": {
"percent": cpu_percent,
"count": cpu_count,
"frequency": cpu_freq.current if cpu_freq else None
},
"memory": {
"total": memory.total,
"available": memory.available,
"percent": memory.percent,
"used": memory.used,
"free": memory.free
},
"swap": {
"total": swap.total,
"used": swap.used,
"percent": swap.percent
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": disk.used / disk.total * 100
},
"network": {
"bytes_sent": network.bytes_sent,
"bytes_recv": network.bytes_recv,
"packets_sent": network.packets_sent,
"packets_recv": network.packets_recv
},
"processes": processes,
"gpu": gpu_metrics
}
def get_gpu_metrics(self) -> List[Dict]:
"""Get GPU metrics if available"""
try:
gpus = GPUtil.getGPUs()
gpu_metrics = []
for gpu in gpus:
gpu_metrics.append({
"id": gpu.id,
"name": gpu.name,
"load": gpu.load * 100,
"memory_total": gpu.memoryTotal,
"memory_used": gpu.memoryUsed,
"memory_percent": (gpu.memoryUsed / gpu.memoryTotal) * 100,
"temperature": gpu.temperature
})
return gpu_metrics
except Exception as e:
self.logger.debug(f"GPU metrics unavailable: {e}")
return []
def store_metrics(self, metrics: Dict):
"""Store metrics in Redis with time-based expiration"""
try:
key = f"ai_scientist:metrics:{metrics['timestamp']}"
# Store metrics
self.redis_client.setex(
key,
self.history_retention,
json.dumps(metrics)
)
# Update latest metrics
self.redis_client.setex(
"ai_scientist:metrics:latest",
self.history_retention,
json.dumps(metrics)
)
except Exception as e:
self.logger.error(f"Error storing metrics: {e}")
def check_alerts(self, metrics: Dict):
"""Check for alert conditions"""
alerts = []
# CPU alert
if metrics["cpu"]["percent"] > self.cpu_threshold:
alerts.append({
"type": "cpu",
"severity": "warning",
"message": f"High CPU usage: {metrics['cpu']['percent']:.1f}%",
"threshold": self.cpu_threshold,
"current": metrics["cpu"]["percent"]
})
# Memory alert
if metrics["memory"]["percent"] > self.memory_threshold:
alerts.append({
"type": "memory",
"severity": "critical" if metrics["memory"]["percent"] > 95 else "warning",
"message": f"High memory usage: {metrics['memory']['percent']:.1f}%",
"threshold": self.memory_threshold,
"current": metrics["memory"]["percent"]
})
# Disk alert
if metrics["disk"]["percent"] > self.disk_threshold:
alerts.append({
"type": "disk",
"severity": "critical",
"message": f"High disk usage: {metrics['disk']['percent']:.1f}%",
"threshold": self.disk_threshold,
"current": metrics["disk"]["percent"]
})
# GPU alerts
for gpu in metrics["gpu"]:
if gpu["load"] > self.gpu_threshold:
alerts.append({
"type": "gpu",
"severity": "warning",
"message": f"High GPU usage on {gpu['name']}: {gpu['load']:.1f}%",
"threshold": self.gpu_threshold,
"current": gpu["load"]
})
if gpu["memory_percent"] > self.memory_threshold:
alerts.append({
"type": "gpu_memory",
"severity": "warning",
"message": f"High GPU memory on {gpu['name']}: {gpu['memory_percent']:.1f}%",
"threshold": self.memory_threshold,
"current": gpu["memory_percent"]
})
# Store alerts if any
if alerts:
self.store_alerts(alerts)
self.logger.warning(f"Generated {len(alerts)} alerts")
return alerts
def store_alerts(self, alerts: List[Dict]):
"""Store alerts in Redis"""
try:
timestamp = datetime.now().isoformat()
for i, alert in enumerate(alerts):
key = f"ai_scientist:alerts:{timestamp}:{i}"
alert_data = {
"timestamp": timestamp,
**alert
}
# Store with 7 day retention
self.redis_client.setex(
key,
7 * 24 * 60 * 60, # 7 days
json.dumps(alert_data)
)
except Exception as e:
self.logger.error(f"Error storing alerts: {e}")
def get_recent_metrics(self, hours: int = 1) -> List[Dict]:
"""Get recent metrics for analysis"""
try:
# Get all metric keys from the last X hours
cutoff_time = datetime.now() - timedelta(hours=hours)
keys = self.redis_client.keys("ai_scientist:metrics:*")
recent_metrics = []
for key in keys:
if key == "ai_scientist:metrics:latest":
continue
# Extract timestamp from key
timestamp_str = key.split(":")[-1]
try:
key_time = datetime.fromisoformat(timestamp_str)
if key_time >= cutoff_time:
data = self.redis_client.get(key)
if data:
recent_metrics.append(json.loads(data))
except ValueError:
continue
# Sort by timestamp
recent_metrics.sort(key=lambda x: x["timestamp"])
return recent_metrics
except Exception as e:
self.logger.error(f"Error getting recent metrics: {e}")
return []
def run_monitoring(self):
"""Main monitoring loop"""
self.logger.info("Resource monitoring started")
while True:
try:
# Collect metrics
metrics = self.get_system_metrics()
# Store metrics
self.store_metrics(metrics)
# Check for alerts
alerts = self.check_alerts(metrics)
# Log summary
self.logger.info(
f"Metrics collected - CPU: {metrics['cpu']['percent']:.1f}%, "
f"Memory: {metrics['memory']['percent']:.1f}%, "
f"Disk: {metrics['disk']['percent']:.1f}%, "
f"Alerts: {len(alerts)}"
)
# Wait for next collection
time.sleep(self.monitoring_interval)
except KeyboardInterrupt:
self.logger.info("Monitoring stopped by user")
break
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
time.sleep(60) # Wait before retrying
# Utility functions for getting metrics from Redis
def get_latest_metrics(redis_host: str = "localhost", redis_port: int = 6379) -> Dict:
"""Get the latest system metrics"""
redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
data = redis_client.get("ai_scientist:metrics:latest")
return json.loads(data) if data else {}
def get_recent_alerts(redis_host: str = "localhost", redis_port: int = 6379, hours: int = 24) -> List[Dict]:
"""Get recent alerts"""
redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
keys = redis_client.keys("ai_scientist:alerts:*")
alerts = []
cutoff_time = datetime.now() - timedelta(hours=hours)
for key in keys:
data = redis_client.get(key)
if data:
alert = json.loads(data)
alert_time = datetime.fromisoformat(alert["timestamp"])
if alert_time >= cutoff_time:
alerts.append(alert)
alerts.sort(key=lambda x: x["timestamp"], reverse=True)
return alerts
# Main execution
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/resource_monitor.log'),
logging.StreamHandler()
]
)
monitor = ResourceMonitor()
try:
monitor.run_monitoring()
except KeyboardInterrupt:
print("Resource monitoring stopped")
Practical Implementation Examples
Example 1: 24/7 Multi-Template Research
#!/bin/bash
# scripts/deploy_ai_scientist_24_7.sh
echo "🚀 Deploying AI Scientist 24/7 Research Pipeline"
# Set environment variables
export OLLAMA_HOST="localhost:11434"
export LM_STUDIO_BASE_URL="http://localhost:1234/v1"
# Start OrbStack
echo "Starting OrbStack..."
orbstack start
# Start Ollama
echo "Starting Ollama..."
ollama serve &
OLLAMA_PID=$!
# Wait for Ollama to be ready
echo "Waiting for Ollama to be ready..."
sleep 10
# Pull required models if not present
echo "Ensuring models are available..."
ollama pull llama2:70b &
ollama pull codellama:34b &
ollama pull mistral:7b &
ollama pull deepseek-coder:33b &
wait
# Start Docker Compose stack
echo "Starting AI Scientist stack..."
cd ~/ai-research-lab
docker-compose up -d
# Wait for services to be ready
echo "Waiting for services to initialize..."
sleep 30
# Submit initial research batch
echo "Submitting initial research tasks..."
python scripts/task_submitter.py --batch
# Start intelligent scheduler
echo "Starting intelligent scheduler..."
python scripts/intelligent_scheduler.py &
SCHEDULER_PID=$!
# Start resource monitoring
echo "Starting resource monitoring..."
python scripts/resource_monitor.py &
MONITOR_PID=$!
echo "✅ AI Scientist 24/7 pipeline deployed successfully!"
echo ""
echo "🌐 Access points:"
echo " - Monitoring Dashboard: http://localhost:8081"
echo " - Jupyter Notebook: http://localhost:8888"
echo " - Queue Stats: View logs or dashboard"
echo ""
echo "📊 To monitor:"
echo " docker-compose logs -f"
echo " tail -f logs/scheduler.log"
echo " tail -f logs/resource_monitor.log"
echo ""
echo "🛑 To stop:"
echo " docker-compose down"
echo " kill $OLLAMA_PID $SCHEDULER_PID $MONITOR_PID"
# Store PIDs for cleanup
echo "$OLLAMA_PID $SCHEDULER_PID $MONITOR_PID" > .ai_scientist_pids
echo "Pipeline is now running 24/7. Press Ctrl+C to stop."
# Wait for interrupt
trap 'echo "Stopping AI Scientist pipeline..."; docker-compose down; kill $OLLAMA_PID $SCHEDULER_PID $MONITOR_PID; exit' INT
while true; do sleep 1; done
Example 2: Weekend Research Marathon
# scripts/weekend_research_marathon.py
"""
Weekend Research Marathon
Automatically runs comprehensive research experiments over the weekend
"""
import schedule
import time
import logging
from datetime import datetime, timedelta
from task_submitter import TaskSubmitter
from queue_manager import QueueManager
class WeekendMarathon:
def __init__(self):
self.submitter = TaskSubmitter()
self.queue_manager = QueueManager()
self.logger = logging.getLogger(__name__)
# Marathon configuration
self.marathon_templates = [
"nanoGPT_lite",
"2d_diffusion",
"grokking"
]
self.marathon_models = [
"llama2:70b", # Large context
"codellama:34b", # Code specialist
"mistral:7b", # Fast inference
"deepseek-coder:33b" # Advanced coding
]
# Research intensity levels
self.research_intensity = {
"light": {"ideas_per_task": 2, "parallel_tasks": 2},
"medium": {"ideas_per_task": 3, "parallel_tasks": 3},
"heavy": {"ideas_per_task": 5, "parallel_tasks": 4},
"extreme": {"ideas_per_task": 8, "parallel_tasks": 6}
}
def friday_evening_prep(self):
"""Friday evening preparation"""
self.logger.info("🎯 Friday Evening: Preparing weekend marathon")
# Submit initial heavy batch
task_ids = self.submitter.submit_research_batch(
templates=self.marathon_templates,
models=self.marathon_models[:2], # Start with 2 models
ideas_per_task=5,
priority=3
)
self.logger.info(f"📝 Submitted {len(task_ids)} Friday prep tasks")
def saturday_morning_boost(self):
"""Saturday morning research boost"""
self.logger.info("🌅 Saturday Morning: Boosting research pipeline")
# Heavy intensity research
intensity = self.research_intensity["heavy"]
task_ids = self.submitter.submit_research_batch(
templates=self.marathon_templates,
models=self.marathon_models, # All models
ideas_per_task=intensity["ideas_per_task"],
priority=3
)
self.logger.info(f"🚀 Saturday boost: {len(task_ids)} tasks submitted")
def saturday_evening_deep_dive(self):
"""Saturday evening deep research"""
self.logger.info("🌙 Saturday Evening: Deep research dive")
# Focus on most complex template with powerful models
task_ids = self.submitter.submit_research_batch(
templates=["grokking"], # Most complex
models=["llama2:70b", "deepseek-coder:33b"],
ideas_per_task=8, # Many ideas
priority=4 # Highest priority
)
self.logger.info(f"🧠 Deep dive: {len(task_ids)} complex tasks")
def sunday_comprehensive_sweep(self):
"""Sunday comprehensive research sweep"""
self.logger.info("🔄 Sunday: Comprehensive research sweep")
# Extreme intensity across all templates
intensity = self.research_intensity["extreme"]
task_ids = self.submitter.submit_research_batch(
templates=self.marathon_templates,
models=self.marathon_models,
ideas_per_task=intensity["ideas_per_task"],
priority=3
)
self.logger.info(f"🌊 Sunday sweep: {len(task_ids)} tasks")
def sunday_evening_wind_down(self):
"""Sunday evening wind down"""
self.logger.info("🌅 Sunday Evening: Winding down marathon")
# Light intensity for Monday preparation
intensity = self.research_intensity["light"]
task_ids = self.submitter.submit_research_batch(
templates=self.marathon_templates[:1], # Just one template
models=self.marathon_models[:2], # Faster models
ideas_per_task=intensity["ideas_per_task"],
priority=2
)
self.logger.info(f"🌙 Wind down: {len(task_ids)} final tasks")
# Generate weekend report
self.generate_weekend_report()
def generate_weekend_report(self):
"""Generate comprehensive weekend research report"""
try:
# Get weekend tasks (Friday 6PM to Sunday 11PM)
friday_start = datetime.now().replace(
hour=18, minute=0, second=0, microsecond=0
) - timedelta(days=2) # Last Friday
sunday_end = datetime.now().replace(
hour=23, minute=0, second=0, microsecond=0
)
# Get task statistics
all_tasks = self.queue_manager.list_tasks()
weekend_tasks = [
task for task in all_tasks
if task.created_at and friday_start <= task.created_at <= sunday_end
]
# Generate report
report = {
"weekend_period": f"{friday_start.strftime('%Y-%m-%d %H:%M')} to {sunday_end.strftime('%Y-%m-%d %H:%M')}",
"total_tasks": len(weekend_tasks),
"completed_tasks": len([t for t in weekend_tasks if t.status.value == "completed"]),
"failed_tasks": len([t for t in weekend_tasks if t.status.value == "failed"]),
"total_ideas_generated": sum(t.num_ideas for t in weekend_tasks),
"templates_used": list(set(t.template for t in weekend_tasks)),
"models_used": list(set(t.model for t in weekend_tasks))
}
# Log report
self.logger.info("📊 Weekend Research Marathon Report:")
for key, value in report.items():
self.logger.info(f" {key}: {value}")
# Save report to file
with open(f"reports/weekend_marathon_{datetime.now().strftime('%Y%m%d')}.json", "w") as f:
import json
json.dump(report, f, indent=2, default=str)
except Exception as e:
self.logger.error(f"Error generating weekend report: {e}")
def setup_marathon_schedule(self):
"""Setup the weekend marathon schedule"""
# Friday evening preparation
schedule.every().friday.at("18:00").do(self.friday_evening_prep)
# Saturday schedule
schedule.every().saturday.at("08:00").do(self.saturday_morning_boost)
schedule.every().saturday.at("20:00").do(self.saturday_evening_deep_dive)
# Sunday schedule
schedule.every().sunday.at("10:00").do(self.sunday_comprehensive_sweep)
schedule.every().sunday.at("22:00").do(self.sunday_evening_wind_down)
self.logger.info("🗓️ Weekend marathon schedule configured")
def run_marathon(self):
"""Run the weekend marathon scheduler"""
self.setup_marathon_schedule()
self.logger.info("🏃♂️ Weekend Research Marathon started")
self.logger.info("Schedule:")
self.logger.info(" Friday 18:00 - Preparation")
self.logger.info(" Saturday 08:00 - Morning boost")
self.logger.info(" Saturday 20:00 - Deep dive")
self.logger.info(" Sunday 10:00 - Comprehensive sweep")
self.logger.info(" Sunday 22:00 - Wind down")
while True:
try:
schedule.run_pending()
time.sleep(60)
except KeyboardInterrupt:
self.logger.info("Weekend marathon stopped by user")
break
except Exception as e:
self.logger.error(f"Marathon error: {e}")
time.sleep(300)
# Standalone weekend kickoff
def manual_weekend_kickoff():
"""Manually start weekend research marathon"""
marathon = WeekendMarathon()
print("🏁 Starting Weekend Research Marathon NOW!")
# Run all marathon phases immediately
marathon.friday_evening_prep()
time.sleep(60)
marathon.saturday_morning_boost()
time.sleep(60)
marathon.saturday_evening_deep_dive()
time.sleep(60)
marathon.sunday_comprehensive_sweep()
print("✅ Weekend marathon tasks submitted!")
print("Monitor progress via dashboard: http://localhost:8081")
if __name__ == "__main__":
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/weekend_marathon.log'),
logging.StreamHandler()
]
)
if len(sys.argv) > 1 and sys.argv[1] == "--kickoff":
manual_weekend_kickoff()
else:
marathon = WeekendMarathon()
try:
marathon.run_marathon()
except KeyboardInterrupt:
print("Weekend marathon scheduler stopped")
Example 3: Adaptive Research Queue
# scripts/adaptive_research_queue.py
"""
Adaptive Research Queue
Intelligently manages research tasks based on system performance,
time of day, and research domain preferences
"""
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
from queue_manager import QueueManager, ResearchTask, TaskStatus
from task_submitter import TaskSubmitter
from resource_monitor import get_latest_metrics
class AdaptiveResearchQueue:
def __init__(self):
self.queue_manager = QueueManager()
self.submitter = TaskSubmitter()
self.logger = logging.getLogger(__name__)
# Research domain configurations
self.domain_configs = {
"nanoGPT_lite": {
"complexity": "medium",
"resource_intensity": "high",
"average_duration": 120, # minutes
"success_rate": 0.85,
"preferred_models": ["codellama:34b", "deepseek-coder:33b"]
},
"2d_diffusion": {
"complexity": "high",
"resource_intensity": "very_high",
"average_duration": 180,
"success_rate": 0.75,
"preferred_models": ["llama2:70b", "mistral:7b"]
},
"grokking": {
"complexity": "very_high",
"resource_intensity": "medium",
"average_duration": 200,
"success_rate": 0.70,
"preferred_models": ["llama2:70b", "deepseek-coder:33b"]
}
}
# Performance thresholds
self.performance_thresholds = {
"cpu_high": 80,
"memory_high": 85,
"cpu_critical": 95,
"memory_critical": 95
}
# Adaptive parameters
self.adaptation_history = []
self.performance_history = []
def analyze_system_performance(self) -> Dict:
"""Analyze current system performance and capacity"""
try:
metrics = get_latest_metrics()
if not metrics:
# Default conservative estimate
return {
"cpu_usage": 50,
"memory_usage": 50,
"capacity_level": "medium",
"recommended_concurrency": 2
}
cpu_usage = metrics["cpu"]["percent"]
memory_usage = metrics["memory"]["percent"]
# Determine capacity level
if (cpu_usage > self.performance_thresholds["cpu_critical"] or
memory_usage > self.performance_thresholds["memory_critical"]):
capacity_level = "critical"
recommended_concurrency = 1
elif (cpu_usage > self.performance_thresholds["cpu_high"] or
memory_usage > self.performance_thresholds["memory_high"]):
capacity_level = "high"
recommended_concurrency = 2
elif cpu_usage > 50 or memory_usage > 50:
capacity_level = "medium"
recommended_concurrency = 3
else:
capacity_level = "low"
recommended_concurrency = 4
analysis = {
"cpu_usage": cpu_usage,
"memory_usage": memory_usage,
"capacity_level": capacity_level,
"recommended_concurrency": recommended_concurrency,
"timestamp": datetime.now().isoformat()
}
# Store in history
self.performance_history.append(analysis)
# Keep only last 24 hours
cutoff = datetime.now() - timedelta(hours=24)
self.performance_history = [
p for p in self.performance_history
if datetime.fromisoformat(p["timestamp"]) > cutoff
]
return analysis
except Exception as e:
self.logger.error(f"Error analyzing system performance: {e}")
return {
"cpu_usage": 50,
"memory_usage": 50,
"capacity_level": "medium",
"recommended_concurrency": 2
}
def get_optimal_research_mix(self, performance: Dict) -> List[Dict]:
"""Determine optimal research task mix based on performance"""
capacity_level = performance["capacity_level"]
current_hour = datetime.now().hour
# Time-based preferences
if 22 <= current_hour or current_hour <= 6:
# Night time: prefer complex, long-running tasks
time_preference = "complex"
elif 9 <= current_hour <= 17:
# Business hours: prefer quick, light tasks
time_preference = "light"
else:
# Evening: balanced approach
time_preference = "balanced"
# Capacity-based adjustments
if capacity_level == "critical":
# Only light tasks
return self._get_light_task_mix()
elif capacity_level == "high":
# Medium complexity tasks
return self._get_medium_task_mix(time_preference)
elif capacity_level == "medium":
# Balanced mix
return self._get_balanced_task_mix(time_preference)
else:
# Low usage: can run heavy tasks
return self._get_heavy_task_mix(time_preference)
def _get_light_task_mix(self) -> List[Dict]:
"""Light computational load task mix"""
return [
{
"template": "nanoGPT_lite",
"model": "mistral:7b",
"num_ideas": 2,
"priority": 1
}
]
def _get_medium_task_mix(self, time_preference: str) -> List[Dict]:
"""Medium computational load task mix"""
if time_preference == "light":
return [
{
"template": "nanoGPT_lite",
"model": "mistral:7b",
"num_ideas": 3,
"priority": 2
},
{
"template": "2d_diffusion",
"model": "mistral:7b",
"num_ideas": 2,
"priority": 1
}
]
else:
return [
{
"template": "nanoGPT_lite",
"model": "codellama:34b",
"num_ideas": 3,
"priority": 2
},
{
"template": "grokking",
"model": "llama2:70b",
"num_ideas": 2,
"priority": 2
}
]
def _get_balanced_task_mix(self, time_preference: str) -> List[Dict]:
"""Balanced computational load task mix"""
if time_preference == "complex":
return [
{
"template": "grokking",
"model": "llama2:70b",
"num_ideas": 4,
"priority": 3
},
{
"template": "2d_diffusion",
"model": "deepseek-coder:33b",
"num_ideas": 3,
"priority": 2
},
{
"template": "nanoGPT_lite",
"model": "codellama:34b",
"num_ideas": 3,
"priority": 2
}
]
else:
return [
{
"template": "nanoGPT_lite",
"model": "codellama:34b",
"num_ideas": 3,
"priority": 2
},
{
"template": "2d_diffusion",
"model": "mistral:7b",
"num_ideas": 3,
"priority": 2
}
]
def _get_heavy_task_mix(self, time_preference: str) -> List[Dict]:
"""Heavy computational load task mix"""
return [
{
"template": "grokking",
"model": "llama2:70b",
"num_ideas": 6,
"priority": 3
},
{
"template": "2d_diffusion",
"model": "deepseek-coder:33b",
"num_ideas": 5,
"priority": 3
},
{
"template": "nanoGPT_lite",
"model": "codellama:34b",
"num_ideas": 4,
"priority": 2
},
{
"template": "grokking",
"model": "deepseek-coder:33b",
"num_ideas": 4,
"priority": 2
}
]
def should_add_tasks(self, performance: Dict) -> bool:
"""Determine if new tasks should be added"""
stats = self.queue_manager.get_queue_stats()
running_tasks = stats.get("running", 0)
pending_tasks = stats.get("pending", 0)
# Don't add tasks if system is under stress
if performance["capacity_level"] == "critical":
return False
# Add tasks based on recommended concurrency
recommended_concurrency = performance["recommended_concurrency"]
total_active = running_tasks + pending_tasks
return total_active < recommended_concurrency
def adaptive_task_submission(self):
"""Submit tasks adaptively based on current conditions"""
try:
# Analyze system performance
performance = self.analyze_system_performance()
self.logger.info(
f"System analysis: CPU {performance['cpu_usage']:.1f}%, "
f"Memory {performance['memory_usage']:.1f}%, "
f"Capacity: {performance['capacity_level']}"
)
# Check if we should add tasks
if not self.should_add_tasks(performance):
self.logger.info("System busy or queue full, skipping task addition")
return
# Get optimal task mix
task_mix = self.get_optimal_research_mix(performance)
# Submit tasks
submitted_tasks = []
for task_config in task_mix:
task_id = self._submit_single_task(task_config)
if task_id:
submitted_tasks.append(task_id)
# Log adaptation
adaptation_record = {
"timestamp": datetime.now().isoformat(),
"performance": performance,
"task_mix": task_mix,
"submitted_tasks": len(submitted_tasks)
}
self.adaptation_history.append(adaptation_record)
self.logger.info(
f"Adaptive submission: {len(submitted_tasks)} tasks for "
f"{performance['capacity_level']} capacity system"
)
except Exception as e:
self.logger.error(f"Error in adaptive task submission: {e}")
def _submit_single_task(self, task_config: Dict) -> str:
"""Submit a single task based on configuration"""
try:
import uuid
task_id = str(uuid.uuid4())
task = ResearchTask(
id=task_id,
template=task_config["template"],
model=task_config["model"],
num_ideas=task_config["num_ideas"],
priority=task_config["priority"]
)
self.queue_manager.add_task(task)
return task_id
except Exception as e:
self.logger.error(f"Error submitting task: {e}")
return None
def generate_adaptation_report(self) -> Dict:
"""Generate report on adaptation performance"""
try:
if not self.adaptation_history:
return {"error": "No adaptation history available"}
# Calculate adaptation statistics
recent_adaptations = self.adaptation_history[-24:] # Last 24 adaptations
capacity_distribution = {}
task_distribution = {}
for adaptation in recent_adaptations:
capacity = adaptation["performance"]["capacity_level"]
capacity_distribution[capacity] = capacity_distribution.get(capacity, 0) + 1
for task in adaptation["task_mix"]:
template = task["template"]
task_distribution[template] = task_distribution.get(template, 0) + 1
# Performance trends
avg_cpu = sum(a["performance"]["cpu_usage"] for a in recent_adaptations) / len(recent_adaptations)
avg_memory = sum(a["performance"]["memory_usage"] for a in recent_adaptations) / len(recent_adaptations)
report = {
"analysis_period": f"Last {len(recent_adaptations)} adaptations",
"average_performance": {
"cpu_usage": round(avg_cpu, 1),
"memory_usage": round(avg_memory, 1)
},
"capacity_distribution": capacity_distribution,
"task_template_distribution": task_distribution,
"total_tasks_submitted": sum(a["submitted_tasks"] for a in recent_adaptations),
"adaptation_frequency": len(recent_adaptations)
}
return report
except Exception as e:
self.logger.error(f"Error generating adaptation report: {e}")
return {"error": str(e)}
def run_adaptive_queue(self, check_interval: int = 300):
"""Run the adaptive queue management system"""
self.logger.info(f"🧠 Starting Adaptive Research Queue (check every {check_interval}s)")
while True:
try:
# Perform adaptive task submission
self.adaptive_task_submission()
# Generate periodic report
if len(self.adaptation_history) % 12 == 0: # Every 12 adaptations
report = self.generate_adaptation_report()
self.logger.info(f"📊 Adaptation Report: {report}")
# Wait for next check
time.sleep(check_interval)
except KeyboardInterrupt:
self.logger.info("Adaptive queue stopped by user")
break
except Exception as e:
self.logger.error(f"Adaptive queue error: {e}")
time.sleep(60)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/adaptive_queue.log'),
logging.StreamHandler()
]
)
adaptive_queue = AdaptiveResearchQueue()
try:
adaptive_queue.run_adaptive_queue(check_interval=300) # Check every 5 minutes
except KeyboardInterrupt:
print("Adaptive research queue stopped")
Troubleshooting and Optimization
Common Issues and Solutions
Issue 1: Ollama Connection Failures
# Debug Ollama connectivity
curl http://localhost:11434/api/tags
# Check Ollama logs
journalctl -u ollama --follow
# Restart Ollama service
sudo systemctl restart ollama
# Alternative: manual restart
pkill ollama
ollama serve
Issue 2: Docker Memory Issues
# docker-compose.override.yml
version: '3.8'
services:
ai-scientist:
deploy:
resources:
limits:
memory: 16G
cpus: '8'
reservations:
memory: 8G
cpus: '4'
environment:
- MALLOC_ARENA_MAX=2
- PYTHONHASHSEED=0
Issue 3: Queue Stuck Tasks
# scripts/queue_recovery.py
from queue_manager import QueueManager, TaskStatus
from datetime import datetime, timedelta
def recover_stuck_tasks():
"""Recover tasks that have been running too long"""
queue_manager = QueueManager()
# Find tasks running longer than 4 hours
cutoff_time = datetime.now() - timedelta(hours=4)
running_tasks = queue_manager.list_tasks(TaskStatus.RUNNING)
stuck_tasks = [
task for task in running_tasks
if task.started_at and task.started_at < cutoff_time
]
for task in stuck_tasks:
print(f"Recovering stuck task: {task.id}")
queue_manager.fail_task(
task.id,
"Task timeout - automatically recovered"
)
print(f"Recovered {len(stuck_tasks)} stuck tasks")
if __name__ == "__main__":
recover_stuck_tasks()
Performance Optimization Tips
- Model Selection Strategy:
# Use smaller models for initial experiments quick_models = ["mistral:7b", "llama2:13b"] # Use larger models for final validation powerful_models = ["llama2:70b", "deepseek-coder:33b"]
- Resource Management:
# Limit concurrent tasks based on system capacity export MAX_CONCURRENT_TASKS=2 # Use fast storage for temporary files export TMPDIR=/tmp/ai-scientist-fast mkdir -p $TMPDIR
- Network Optimization:
# Use local DNS for faster resolution echo "127.0.0.1 host.docker.internal" >> /etc/hosts # Optimize Docker networking docker network create --driver bridge ai-research-net
Conclusion
This comprehensive guide provides you with everything needed to set up a sophisticated, automated research pipeline using SakanaAI’s AI Scientist. The combination of local LLMs, intelligent queue management, and adaptive resource allocation creates a powerful system capable of conducting research around the clock.
Key Benefits Achieved
- 24/7 Research Operation: Continuous scientific discovery without human intervention
- Cost-Effective: Local LLM inference eliminates API costs for large-scale research
- Adaptive Intelligence: System automatically adjusts to performance conditions
- Comprehensive Monitoring: Real-time visibility into research progress and system health
- Scalable Architecture: Easily expandable to accommodate new research domains
Next Steps
- Template Expansion: Create custom research templates for your specific domains
- Model Fine-tuning: Fine-tune local models on your research data for better results
- Integration Enhancement: Connect with external databases and research APIs
- Collaborative Features: Add multi-user support and research team collaboration
- Publication Pipeline: Automate paper submission and review processes
The future of automated scientific research is here, and with this setup, you’re positioned at the forefront of this technological revolution. Start your research marathon today and let AI push the boundaries of human knowledge while you sleep! 🧑🔬🚀