Β· Tutorials Β· 21 min read
Building Your First AI Agent in 2025 - Complete Step-by-Step Guide
Master AI agent development with the latest 2025 frameworks and techniques. Build autonomous agents using LangChain v0.3, OpenAI GPT-4o, and modern Python practices with real-world examples.
Building AI agents has never been more accessible than in 2025. With the latest advancements in LLMs, improved frameworks, and better tooling, you can create sophisticated autonomous agents in just a few hours. This comprehensive tutorial will guide you through building your first AI agent using the most current technologies and best practices.
Table of Contents
- What Youβll Build
- 2025 Technology Stack
- Prerequisites and Setup
- Building the Core Agent
- Advanced Features
- Production Deployment
- Monitoring and Optimization
What Youβll Build
By the end of this tutorial, youβll have created a production-ready AI agent that can:
- Understand Complex Queries: Process natural language with context awareness
- Use Multiple Tools: Access APIs, perform calculations, search the web, and manipulate files
- Maintain Memory: Remember conversation history and user preferences
- Handle Errors Gracefully: Provide meaningful feedback when things go wrong
- Scale Efficiently: Handle multiple concurrent users
- Monitor Performance: Track usage, costs, and success rates
Live Demo: Your agent will be capable of tasks like:
- βAnalyze the weather in Tokyo and suggest what to wearβ
- βCalculate my monthly savings if I invest $500 at 7% annual return for 10 yearsβ
- βSearch for the latest AI research papers and summarize the key findingsβ
2025 Technology Stack
Core Technologies
| Component | 2025 Recommendation | Why This Choice |
|---|---|---|
| LLM Provider | OpenAI GPT-4o / Claude-3.5-Sonnet | Best reasoning capabilities, function calling |
| Framework | LangChain v0.3 | Mature ecosystem, excellent tool integration |
| Python Version | Python 3.11+ | Performance improvements, better typing |
| Async Support | asyncio + aiohttp | Essential for production scalability |
| Memory | Redis + PostgreSQL | Persistent, scalable memory solutions |
| Monitoring | LangSmith + Prometheus | Comprehensive observability |
| Deployment | Docker + Kubernetes | Cloud-native, scalable deployment |
Key 2025 Improvements
- Function Calling 2.0: More reliable tool usage with structured outputs
- Streaming Responses: Real-time response generation
- Multi-modal Capabilities: Text, image, and audio processing
- Cost Optimization: Smart caching and model routing
- Enhanced Security: Built-in prompt injection protection
Prerequisites and Setup
System Requirements
- Python 3.11+ (for improved performance and typing)
- 8GB RAM minimum (16GB recommended for local development)
- API Keys: OpenAI, Anthropic, or other LLM providers
- Basic Knowledge: Python, async programming, REST APIs
Development Environment Setup
# Create project with modern Python
mkdir ai-agent-2025
cd ai-agent-2025
# Use Python 3.11+ for best performance
python3.11 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install 2025 dependencies
pip install --upgrade pip
pip install -r requirements.txtrequirements.txt (2025 Edition)
# Core AI Framework
langchain==0.3.0
langchain-openai==0.2.0
langchain-anthropic==0.2.0
langchain-community==0.3.0
# LLM Providers
openai==1.50.0
anthropic==0.35.0
# Async Support
aiohttp==3.9.0
asyncio-mqtt==0.16.0
# Tools and Integrations
duckduckgo-search==6.0.0
requests==2.32.0
beautifulsoup4==4.12.0
python-dotenv==1.0.0
# Memory and Storage
redis==5.1.0
psycopg2-binary==2.9.9
sqlalchemy==2.0.25
# Monitoring and Observability
langsmith==0.1.0
prometheus-client==0.20.0
# Security
cryptography==42.0.0
pydantic==2.8.0
# Development Tools
pytest==8.0.0
black==24.0.0
mypy==1.8.0Building the Core Agent
Step 1: Modern Project Structure
ai-agent-2025/
βββ src/
β βββ agent/
β β βββ __init__.py
β β βββ core.py # Main agent logic
β β βββ tools.py # Tool definitions
β β βββ memory.py # Memory management
β β βββ config.py # Configuration
β βββ api/
β β βββ __init__.py
β β βββ routes.py # FastAPI routes
β β βββ middleware.py # Security, logging
β βββ utils/
β βββ __init__.py
β βββ logging.py # Structured logging
β βββ monitoring.py # Metrics collection
βββ tests/
βββ docker/
βββ .env.example
βββ requirements.txt
βββ README.mdStep 2: Configuration Management
Create src/agent/config.py:
from pydantic import BaseSettings, Field
from typing import Optional, List
import os
class AgentConfig(BaseSettings):
"""2025 Agent Configuration with Pydantic v2"""
# LLM Configuration
openai_api_key: str = Field(..., env="OPENAI_API_KEY")
anthropic_api_key: Optional[str] = Field(None, env="ANTHROPIC_API_KEY")
default_model: str = Field("gpt-4o", env="DEFAULT_MODEL")
temperature: float = Field(0.1, env="TEMPERATURE")
max_tokens: int = Field(4000, env="MAX_TOKENS")
# Agent Behavior
max_iterations: int = Field(10, env="MAX_ITERATIONS")
timeout_seconds: int = Field(30, env="TIMEOUT_SECONDS")
enable_streaming: bool = Field(True, env="ENABLE_STREAMING")
# Memory Configuration
redis_url: str = Field("redis://localhost:6379", env="REDIS_URL")
postgres_url: Optional[str] = Field(None, env="POSTGRES_URL")
memory_ttl_hours: int = Field(24, env="MEMORY_TTL_HOURS")
# Security
allowed_domains: List[str] = Field(
default=["openai.com", "anthropic.com"],
env="ALLOWED_DOMAINS"
)
enable_content_filter: bool = Field(True, env="ENABLE_CONTENT_FILTER")
# Monitoring
langsmith_api_key: Optional[str] = Field(None, env="LANGSMITH_API_KEY")
enable_metrics: bool = Field(True, env="ENABLE_METRICS")
class Config:
env_file = ".env"
case_sensitive = False
# Global config instance
config = AgentConfig()Step 3: Enhanced Tool System
Create src/agent/tools.py:
import asyncio
import aiohttp
import json
from typing import Dict, Any, Optional
from langchain.tools import BaseTool
from langchain.pydantic_v1 import BaseModel, Field
from duckduckgo_search import DDGS
import math
import re
class CalculatorInput(BaseModel):
"""Input for calculator tool"""
expression: str = Field(description="Mathematical expression to evaluate")
class AdvancedCalculator(BaseTool):
"""2025 Enhanced Calculator with safety and advanced functions"""
name = "advanced_calculator"
description = """
Perform mathematical calculations including:
- Basic arithmetic (+, -, *, /, **, %)
- Scientific functions (sin, cos, tan, log, sqrt)
- Financial calculations (compound interest, NPV)
- Statistical functions (mean, median, std)
"""
args_schema = CalculatorInput
def _run(self, expression: str) -> str:
"""Execute calculation synchronously"""
return asyncio.run(self._arun(expression))
async def _arun(self, expression: str) -> str:
"""Execute calculation asynchronously"""
try:
# Sanitize input
safe_expression = self._sanitize_expression(expression)
# Create safe evaluation environment
safe_dict = {
"__builtins__": {},
"abs": abs, "round": round, "min": min, "max": max,
"sum": sum, "len": len,
"sin": math.sin, "cos": math.cos, "tan": math.tan,
"sqrt": math.sqrt, "log": math.log, "log10": math.log10,
"pi": math.pi, "e": math.e,
"pow": pow, "exp": math.exp,
}
result = eval(safe_expression, safe_dict)
return f"Result: {result}"
except Exception as e:
return f"Calculation error: {str(e)}"
def _sanitize_expression(self, expression: str) -> str:
"""Sanitize mathematical expression"""
# Remove dangerous patterns
dangerous_patterns = [
r'__.*__', r'import', r'exec', r'eval', r'open',
r'file', r'input', r'raw_input'
]
for pattern in dangerous_patterns:
if re.search(pattern, expression, re.IGNORECASE):
raise ValueError(f"Dangerous pattern detected: {pattern}")
return expression
class WebSearchInput(BaseModel):
"""Input for web search tool"""
query: str = Field(description="Search query")
max_results: int = Field(default=5, description="Maximum number of results")
class EnhancedWebSearch(BaseTool):
"""2025 Web Search with async support and result filtering"""
name = "web_search"
description = """
Search the web for current information on any topic.
Returns relevant, up-to-date results with summaries.
"""
args_schema = WebSearchInput
def _run(self, query: str, max_results: int = 5) -> str:
"""Execute search synchronously"""
return asyncio.run(self._arun(query, max_results))
async def _arun(self, query: str, max_results: int = 5) -> str:
"""Execute search asynchronously"""
try:
# Use DuckDuckGo for privacy-focused search
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
if not results:
return "No search results found."
# Format results
formatted_results = []
for i, result in enumerate(results, 1):
formatted_results.append(
f"{i}. **{result['title']}**\n"
f" {result['body']}\n"
f" Source: {result['href']}\n"
)
return "\n".join(formatted_results)
except Exception as e:
return f"Search error: {str(e)}"
class WeatherInput(BaseModel):
"""Input for weather tool"""
location: str = Field(description="City name or coordinates")
class WeatherTool(BaseTool):
"""2025 Weather tool with real API integration"""
name = "get_weather"
description = """
Get current weather information for any location worldwide.
Provides temperature, conditions, humidity, and forecast.
"""
args_schema = WeatherInput
def __init__(self):
super().__init__()
self.api_key = os.getenv("WEATHER_API_KEY") # Get from OpenWeatherMap
def _run(self, location: str) -> str:
"""Execute weather lookup synchronously"""
return asyncio.run(self._arun(location))
async def _arun(self, location: str) -> str:
"""Execute weather lookup asynchronously"""
if not self.api_key:
return "Weather API key not configured. Please set WEATHER_API_KEY environment variable."
try:
async with aiohttp.ClientSession() as session:
url = f"http://api.openweathermap.org/data/2.5/weather"
params = {
"q": location,
"appid": self.api_key,
"units": "metric"
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
return self._format_weather_data(data)
else:
return f"Weather data not available for {location}"
except Exception as e:
return f"Weather lookup error: {str(e)}"
def _format_weather_data(self, data: Dict[str, Any]) -> str:
"""Format weather data for display"""
try:
location = data["name"]
country = data["sys"]["country"]
temp = data["main"]["temp"]
feels_like = data["main"]["feels_like"]
humidity = data["main"]["humidity"]
description = data["weather"][0]["description"].title()
return f"""
Weather in {location}, {country}:
π‘οΈ Temperature: {temp}Β°C (feels like {feels_like}Β°C)
π€οΈ Conditions: {description}
π§ Humidity: {humidity}%
""".strip()
except KeyError as e:
return f"Error parsing weather data: {str(e)}"
# Tool registry for easy management
def get_available_tools() -> List[BaseTool]:
"""Get all available tools for the agent"""
tools = [
AdvancedCalculator(),
EnhancedWebSearch(),
WeatherTool(),
]
return toolsStep 4: Modern Agent Core
Create src/agent/core.py:
import asyncio
import logging
from typing import List, Dict, Any, Optional, AsyncGenerator
from datetime import datetime
from langchain.agents import create_openai_functions_agent, AgentExecutor
from langchain.memory import ConversationBufferWindowMemory
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import BaseMessage
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.callbacks.base import BaseCallbackHandler
from .config import config
from .tools import get_available_tools
from .memory import RedisMemoryManager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StreamingCallbackHandler(BaseCallbackHandler):
"""Custom callback handler for streaming responses"""
def __init__(self):
self.tokens = []
def on_llm_new_token(self, token: str, **kwargs) -> None:
"""Handle new token from LLM"""
self.tokens.append(token)
print(token, end="", flush=True)
class AIAgent2025:
"""
Modern AI Agent with 2025 best practices:
- Async support
- Streaming responses
- Persistent memory
- Error handling
- Monitoring
"""
def __init__(self, user_id: str = "default"):
self.user_id = user_id
self.memory_manager = RedisMemoryManager(user_id)
self.tools = get_available_tools()
self.llm = self._initialize_llm()
self.agent_executor = self._create_agent()
logger.info(f"AI Agent initialized for user: {user_id}")
def _initialize_llm(self) -> ChatOpenAI:
"""Initialize the language model with 2025 settings"""
callbacks = []
if config.enable_streaming:
callbacks.append(StreamingCallbackHandler())
return ChatOpenAI(
model=config.default_model,
temperature=config.temperature,
max_tokens=config.max_tokens,
openai_api_key=config.openai_api_key,
streaming=config.enable_streaming,
callbacks=callbacks,
)
def _create_agent(self) -> AgentExecutor:
"""Create the agent executor with modern prompt template"""
# 2025 Enhanced System Prompt
system_prompt = """
You are an advanced AI assistant created in 2025. You have access to multiple tools and can help users with a wide variety of tasks.
Key capabilities:
- Perform complex calculations and mathematical analysis
- Search the web for current information
- Get real-time weather data
- Remember conversation context
- Provide detailed, accurate responses
Guidelines:
- Always use tools when they can provide more accurate or current information
- Be helpful, harmless, and honest
- If you're unsure about something, say so and suggest how to find the answer
- Provide step-by-step explanations for complex problems
- Consider the user's context and previous conversations
Current date: {current_date}
User ID: {user_id}
"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt.format(
current_date=datetime.now().strftime("%Y-%m-%d"),
user_id=self.user_id
)),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# Create agent with function calling
agent = create_openai_functions_agent(
llm=self.llm,
tools=self.tools,
prompt=prompt
)
# Create executor with memory
memory = ConversationBufferWindowMemory(
memory_key="chat_history",
return_messages=True,
k=10 # Keep last 10 exchanges
)
return AgentExecutor(
agent=agent,
tools=self.tools,
memory=memory,
verbose=True,
max_iterations=config.max_iterations,
handle_parsing_errors=True,
)
async def chat(self, message: str) -> str:
"""
Main chat interface with async support
"""
try:
# Load conversation history
await self.memory_manager.load_memory(self.agent_executor.memory)
# Process the message
response = await self._process_message(message)
# Save conversation history
await self.memory_manager.save_memory(self.agent_executor.memory)
return response
except Exception as e:
logger.error(f"Error in chat: {str(e)}")
return f"I apologize, but I encountered an error: {str(e)}. Please try again."
async def _process_message(self, message: str) -> str:
"""Process a single message through the agent"""
try:
# Run agent in thread pool to avoid blocking
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
self.agent_executor.invoke,
{"input": message}
)
return response["output"]
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
raise
async def stream_chat(self, message: str) -> AsyncGenerator[str, None]:
"""
Streaming chat interface for real-time responses
"""
try:
# This is a simplified streaming implementation
# In production, you'd want to use LangChain's streaming callbacks
response = await self.chat(message)
# Simulate streaming by yielding chunks
words = response.split()
for word in words:
yield word + " "
await asyncio.sleep(0.05) # Small delay for streaming effect
except Exception as e:
yield f"Error: {str(e)}"
async def get_conversation_history(self) -> List[Dict[str, Any]]:
"""Get formatted conversation history"""
try:
history = await self.memory_manager.get_history()
return [
{
"timestamp": msg.get("timestamp", ""),
"role": "human" if msg["type"] == "human" else "assistant",
"content": msg["content"]
}
for msg in history
]
except Exception as e:
logger.error(f"Error getting history: {str(e)}")
return []
async def clear_memory(self) -> bool:
"""Clear conversation memory"""
try:
await self.memory_manager.clear_memory()
self.agent_executor.memory.clear()
return True
except Exception as e:
logger.error(f"Error clearing memory: {str(e)}")
return False
def get_available_tools_info(self) -> List[Dict[str, str]]:
"""Get information about available tools"""
return [
{
"name": tool.name,
"description": tool.description,
}
for tool in self.tools
]
# Factory function for easy agent creation
async def create_agent(user_id: str = "default") -> AIAgent2025:
"""Create and initialize an AI agent"""
agent = AIAgent2025(user_id)
return agentStep 5: Redis Memory Management
Create src/agent/memory.py:
import json
import redis.asyncio as redis
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging
from langchain.memory.chat_memory import BaseChatMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from .config import config
logger = logging.getLogger(__name__)
class RedisMemoryManager:
"""
2025 Memory management with Redis for persistence and scalability
"""
def __init__(self, user_id: str):
self.user_id = user_id
self.redis_client = redis.from_url(config.redis_url)
self.memory_key = f"agent_memory:{user_id}"
self.ttl_seconds = config.memory_ttl_hours * 3600
async def save_memory(self, memory: BaseChatMemory) -> bool:
"""Save conversation memory to Redis"""
try:
# Extract messages from memory
messages = []
if hasattr(memory, 'chat_memory') and hasattr(memory.chat_memory, 'messages'):
for msg in memory.chat_memory.messages:
messages.append({
"type": msg.__class__.__name__.lower().replace("message", ""),
"content": msg.content,
"timestamp": datetime.now().isoformat()
})
# Save to Redis with TTL
await self.redis_client.setex(
self.memory_key,
self.ttl_seconds,
json.dumps(messages)
)
logger.info(f"Saved {len(messages)} messages for user {self.user_id}")
return True
except Exception as e:
logger.error(f"Error saving memory: {str(e)}")
return False
async def load_memory(self, memory: BaseChatMemory) -> bool:
"""Load conversation memory from Redis"""
try:
# Get messages from Redis
data = await self.redis_client.get(self.memory_key)
if not data:
return True # No existing memory is fine
messages_data = json.loads(data)
# Convert back to LangChain messages
messages = []
for msg_data in messages_data:
if msg_data["type"] == "human":
messages.append(HumanMessage(content=msg_data["content"]))
elif msg_data["type"] == "ai":
messages.append(AIMessage(content=msg_data["content"]))
# Load into memory
if hasattr(memory, 'chat_memory'):
memory.chat_memory.messages = messages
logger.info(f"Loaded {len(messages)} messages for user {self.user_id}")
return True
except Exception as e:
logger.error(f"Error loading memory: {str(e)}")
return False
async def get_history(self) -> List[Dict[str, Any]]:
"""Get conversation history as structured data"""
try:
data = await self.redis_client.get(self.memory_key)
if not data:
return []
return json.loads(data)
except Exception as e:
logger.error(f"Error getting history: {str(e)}")
return []
async def clear_memory(self) -> bool:
"""Clear conversation memory"""
try:
await self.redis_client.delete(self.memory_key)
logger.info(f"Cleared memory for user {self.user_id}")
return True
except Exception as e:
logger.error(f"Error clearing memory: {str(e)}")
return False
async def get_memory_stats(self) -> Dict[str, Any]:
"""Get memory usage statistics"""
try:
exists = await self.redis_client.exists(self.memory_key)
if not exists:
return {"exists": False, "message_count": 0, "ttl": 0}
ttl = await self.redis_client.ttl(self.memory_key)
data = await self.redis_client.get(self.memory_key)
messages = json.loads(data) if data else []
return {
"exists": True,
"message_count": len(messages),
"ttl_seconds": ttl,
"expires_at": (datetime.now() + timedelta(seconds=ttl)).isoformat() if ttl > 0 else None
}
except Exception as e:
logger.error(f"Error getting memory stats: {str(e)}")
return {"error": str(e)}
async def close(self):
"""Close Redis connection"""
await self.redis_client.close()Advanced Features
Step 6: FastAPI Web Interface
Create src/api/routes.py:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import asyncio
import json
import uuid
from ..agent.core import create_agent, AIAgent2025
from ..agent.config import config
app = FastAPI(
title="AI Agent 2025 API",
description="Modern AI Agent with advanced capabilities",
version="1.0.0"
)
# CORS middleware for web frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request/Response models
class ChatRequest(BaseModel):
message: str
user_id: Optional[str] = None
stream: bool = False
class ChatResponse(BaseModel):
response: str
user_id: str
timestamp: str
tools_used: List[str] = []
class HistoryResponse(BaseModel):
history: List[Dict[str, Any]]
user_id: str
# Agent management
agents: Dict[str, AIAgent2025] = {}
async def get_or_create_agent(user_id: str) -> AIAgent2025:
"""Get existing agent or create new one"""
if user_id not in agents:
agents[user_id] = await create_agent(user_id)
return agents[user_id]
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""Main chat endpoint"""
try:
user_id = request.user_id or str(uuid.uuid4())
agent = await get_or_create_agent(user_id)
response = await agent.chat(request.message)
return ChatResponse(
response=response,
user_id=user_id,
timestamp=datetime.now().isoformat()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat/stream")
async def stream_chat_endpoint(request: ChatRequest):
"""Streaming chat endpoint"""
try:
user_id = request.user_id or str(uuid.uuid4())
agent = await get_or_create_agent(user_id)
async def generate_stream():
async for chunk in agent.stream_chat(request.message):
yield f"data: {json.dumps({'chunk': chunk, 'user_id': user_id})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/plain",
headers={"Cache-Control": "no-cache"}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/history/{user_id}", response_model=HistoryResponse)
async def get_history_endpoint(user_id: str):
"""Get conversation history"""
try:
agent = await get_or_create_agent(user_id)
history = await agent.get_conversation_history()
return HistoryResponse(
history=history,
user_id=user_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/history/{user_id}")
async def clear_history_endpoint(user_id: str):
"""Clear conversation history"""
try:
if user_id in agents:
success = await agents[user_id].clear_memory()
if success:
return {"message": "History cleared successfully"}
else:
raise HTTPException(status_code=500, detail="Failed to clear history")
else:
return {"message": "No history found for user"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/tools")
async def get_tools_endpoint():
"""Get available tools information"""
try:
# Create a temporary agent to get tools info
temp_agent = await create_agent("temp")
tools_info = temp_agent.get_available_tools_info()
return {"tools": tools_info}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"active_agents": len(agents)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Step 7: Command Line Interface
Create src/cli.py:
import asyncio
import sys
from typing import Optional
import click
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.prompt import Prompt
from .agent.core import create_agent
console = Console()
class CLIInterface:
"""Modern CLI interface with rich formatting"""
def __init__(self, user_id: str = "cli_user"):
self.user_id = user_id
self.agent: Optional[AIAgent2025] = None
async def initialize(self):
"""Initialize the agent"""
console.print("π€ Initializing AI Agent 2025...", style="blue")
self.agent = await create_agent(self.user_id)
console.print("β
Agent ready!", style="green")
async def run_interactive(self):
"""Run interactive chat session"""
console.print(Panel.fit(
"Welcome to AI Agent 2025!\n"
"Type 'help' for commands, 'quit' to exit.",
title="AI Agent 2025",
border_style="blue"
))
while True:
try:
# Get user input
user_input = Prompt.ask("\n[bold blue]You[/bold blue]")
if user_input.lower() in ['quit', 'exit', 'q']:
console.print("π Goodbye!", style="yellow")
break
if user_input.lower() == 'help':
self.show_help()
continue
if user_input.lower() == 'clear':
await self.agent.clear_memory()
console.print("π§Ή Memory cleared!", style="green")
continue
if user_input.lower() == 'history':
await self.show_history()
continue
if user_input.lower() == 'tools':
self.show_tools()
continue
# Process the message
console.print("\n[bold green]Agent[/bold green]: ", end="")
response = await self.agent.chat(user_input)
# Display response with markdown formatting
console.print(Markdown(response))
except KeyboardInterrupt:
console.print("\nπ Goodbye!", style="yellow")
break
except Exception as e:
console.print(f"β Error: {str(e)}", style="red")
def show_help(self):
"""Show help information"""
help_text = """
## Available Commands
- **help** - Show this help message
- **clear** - Clear conversation memory
- **history** - Show conversation history
- **tools** - Show available tools
- **quit/exit/q** - Exit the application
## Tips
- Ask questions naturally
- Use tools by mentioning calculations, weather, or web search
- The agent remembers your conversation context
"""
console.print(Panel(Markdown(help_text), title="Help", border_style="yellow"))
async def show_history(self):
"""Show conversation history"""
try:
history = await self.agent.get_conversation_history()
if not history:
console.print("π No conversation history yet.", style="yellow")
return
console.print("\nπ Conversation History:", style="blue")
for entry in history[-10:]: # Show last 10 entries
role = "You" if entry["role"] == "human" else "Agent"
style = "blue" if entry["role"] == "human" else "green"
console.print(f"[bold {style}]{role}[/bold {style}]: {entry['content'][:100]}...")
except Exception as e:
console.print(f"β Error getting history: {str(e)}", style="red")
def show_tools(self):
"""Show available tools"""
if not self.agent:
console.print("β Agent not initialized", style="red")
return
tools_info = self.agent.get_available_tools_info()
tools_text = "## Available Tools\n\n"
for tool in tools_info:
tools_text += f"**{tool['name']}**: {tool['description']}\n\n"
console.print(Panel(Markdown(tools_text), title="Tools", border_style="green"))
@click.command()
@click.option('--user-id', default='cli_user', help='User ID for the session')
@click.option('--message', help='Single message to process (non-interactive)')
async def main(user_id: str, message: Optional[str]):
"""AI Agent 2025 Command Line Interface"""
cli = CLIInterface(user_id)
await cli.initialize()
if message:
# Single message mode
response = await cli.agent.chat(message)
console.print(f"[bold green]Agent[/bold green]: {response}")
else:
# Interactive mode
await cli.run_interactive()
if __name__ == "__main__":
asyncio.run(main())Production Deployment
Step 8: Docker Configuration
Create Dockerfile:
# Use Python 3.11 for best performance
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY src/ ./src/
COPY .env.example .env
# Create non-root user
RUN useradd -m -u 1000 agent && chown -R agent:agent /app
USER agent
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run the application
CMD ["python", "-m", "uvicorn", "src.api.routes:app", "--host", "0.0.0.0", "--port", "8000"]Create docker-compose.yml:
version: '3.8'
services:
ai-agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://agent:password@postgres:5432/agent_db
depends_on:
- redis
- postgres
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=agent_db
- POSTGRES_USER=agent
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
restart: unless-stopped
volumes:
redis_data:
postgres_data:Monitoring and Optimization
Step 9: Performance Monitoring
Create src/utils/monitoring.py:
import time
import logging
from typing import Dict, Any, Optional
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from functools import wraps
import asyncio
# Prometheus metrics
REQUEST_COUNT = Counter('agent_requests_total', 'Total agent requests', ['user_id', 'status'])
REQUEST_DURATION = Histogram('agent_request_duration_seconds', 'Request duration')
ACTIVE_SESSIONS = Gauge('agent_active_sessions', 'Number of active sessions')
TOKEN_USAGE = Counter('agent_tokens_used_total', 'Total tokens used', ['model', 'type'])
ERROR_COUNT = Counter('agent_errors_total', 'Total errors', ['error_type'])
logger = logging.getLogger(__name__)
class PerformanceMonitor:
"""2025 Performance monitoring with Prometheus metrics"""
def __init__(self):
self.start_time = time.time()
self.request_times: Dict[str, float] = {}
def start_request(self, request_id: str) -> None:
"""Start timing a request"""
self.request_times[request_id] = time.time()
def end_request(self, request_id: str, user_id: str, status: str = "success") -> float:
"""End timing a request and record metrics"""
if request_id not in self.request_times:
return 0.0
duration = time.time() - self.request_times[request_id]
del self.request_times[request_id]
# Record metrics
REQUEST_COUNT.labels(user_id=user_id, status=status).inc()
REQUEST_DURATION.observe(duration)
return duration
def record_token_usage(self, model: str, prompt_tokens: int, completion_tokens: int):
"""Record token usage"""
TOKEN_USAGE.labels(model=model, type="prompt").inc(prompt_tokens)
TOKEN_USAGE.labels(model=model, type="completion").inc(completion_tokens)
def record_error(self, error_type: str):
"""Record an error"""
ERROR_COUNT.labels(error_type=error_type).inc()
def update_active_sessions(self, count: int):
"""Update active session count"""
ACTIVE_SESSIONS.set(count)
# Global monitor instance
monitor = PerformanceMonitor()
def track_performance(func):
"""Decorator to track function performance"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
request_id = f"{func.__name__}_{time.time()}"
monitor.start_request(request_id)
try:
result = await func(*args, **kwargs)
monitor.end_request(request_id, "unknown", "success")
return result
except Exception as e:
monitor.end_request(request_id, "unknown", "error")
monitor.record_error(type(e).__name__)
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
request_id = f"{func.__name__}_{time.time()}"
monitor.start_request(request_id)
try:
result = func(*args, **kwargs)
monitor.end_request(request_id, "unknown", "success")
return result
except Exception as e:
monitor.end_request(request_id, "unknown", "error")
monitor.record_error(type(e).__name__)
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
def start_metrics_server(port: int = 8001):
"""Start Prometheus metrics server"""
start_http_server(port)
logger.info(f"Metrics server started on port {port}")Step 10: Testing Framework
Create tests/test_agent.py:
import pytest
import asyncio
from unittest.mock import Mock, patch
from src.agent.core import AIAgent2025, create_agent
from src.agent.tools import AdvancedCalculator, EnhancedWebSearch
@pytest.fixture
async def agent():
"""Create test agent"""
return await create_agent("test_user")
@pytest.mark.asyncio
async def test_basic_chat(agent):
"""Test basic chat functionality"""
response = await agent.chat("Hello, how are you?")
assert isinstance(response, str)
assert len(response) > 0
@pytest.mark.asyncio
async def test_calculator_tool():
"""Test calculator tool"""
calc = AdvancedCalculator()
result = await calc._arun("2 + 2")
assert "4" in result
@pytest.mark.asyncio
async def test_web_search_tool():
"""Test web search tool"""
search = EnhancedWebSearch()
# Mock the search to avoid external API calls in tests
with patch('duckduckgo_search.DDGS') as mock_ddgs:
mock_ddgs.return_value.__enter__.return_value.text.return_value = [
{"title": "Test Result", "body": "Test description", "href": "https://example.com"}
]
result = await search._arun("test query")
assert "Test Result" in result
@pytest.mark.asyncio
async def test_memory_persistence(agent):
"""Test memory persistence"""
# Send first message
await agent.chat("My name is Alice")
# Send second message referencing first
response = await agent.chat("What's my name?")
# Should remember the name
assert "Alice" in response
@pytest.mark.asyncio
async def test_error_handling(agent):
"""Test error handling"""
# This should not crash the agent
response = await agent.chat("Calculate the square root of -1")
assert isinstance(response, str)
# Should handle the error gracefully
@pytest.mark.asyncio
async def test_tool_selection(agent):
"""Test that agent selects appropriate tools"""
# Should use calculator
response = await agent.chat("What is 15 * 23?")
assert "345" in response or "calculator" in response.lower()
if __name__ == "__main__":
pytest.main([__file__])Usage Examples and Best Practices
Example 1: Running the CLI
# Install dependencies
pip install -r requirements.txt
# Set up environment
cp .env.example .env
# Edit .env with your API keys
# Run CLI interface
python -m src.cli
# Or run with specific user ID
python -m src.cli --user-id john_doe
# Single message mode
python -m src.cli --message "What's the weather in Tokyo?"Example 2: Web API Usage
# Start the web server
python -m src.api.routes
# Test with curl
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{"message": "Calculate 15% tip on $45.50", "user_id": "user123"}'
# Stream response
curl -X POST "http://localhost:8000/chat/stream" \
-H "Content-Type: application/json" \
-d '{"message": "Tell me about AI agents", "user_id": "user123"}'Example 3: Docker Deployment
# Build and run with Docker Compose
docker-compose up -d
# Check logs
docker-compose logs -f ai-agent
# Scale the service
docker-compose up -d --scale ai-agent=32025 Best Practices Summary
1. Performance Optimization
- Use async/await throughout the application
- Implement connection pooling for databases
- Cache frequently used responses
- Use streaming for long responses
2. Security
- Validate all inputs
- Use environment variables for secrets
- Implement rate limiting
- Add content filtering for harmful requests
3. Monitoring
- Track token usage and costs
- Monitor response times
- Log errors and exceptions
- Set up alerts for critical issues
4. Scalability
- Use Redis for session management
- Implement horizontal scaling
- Use load balancers
- Consider microservices architecture
5. User Experience
- Provide streaming responses
- Implement conversation memory
- Handle errors gracefully
- Offer clear tool descriptions
Conclusion
Youβve now built a production-ready AI agent using 2025βs best practices and technologies. This agent features:
β Modern Architecture: Async Python, Redis memory, FastAPI web interface β Advanced Tools: Calculator, web search, weather, with easy extensibility β Production Features: Monitoring, logging, error handling, Docker deployment β User Experience: Streaming responses, persistent memory, multiple interfaces β Scalability: Horizontal scaling, load balancing, microservices ready
Next Steps
- Add More Tools: Database queries, file operations, email sending
- Implement Authentication: User management and access control
- Add Multi-modal Support: Image and audio processing
- Create Web Frontend: React or Vue.js interface
- Deploy to Cloud: AWS, GCP, or Azure deployment
- Implement Analytics: User behavior tracking and insights
The AI agent landscape continues to evolve rapidly. Stay updated with the latest developments in LLMs, frameworks, and deployment strategies to keep your agents at the cutting edge.
Related Resources:
- How to Choose the Right Agentic AI Framework
- LangChain v0.3 Migration Guide
- AI Agent Security Best Practices
- Production Deployment Strategies
- Multi-Agent Systems Tutorial
Community:
