Real-World Use Cases
Complete examples showing LyDos agents in production scenarios.
1. Automated Code Review in CI/CD
Automatically review pull requests, scan for vulnerabilities, and provide detailed feedback.
Setup
# .github/workflows/lydos-review.yml
name: LyDos Code Review
on:
pull_request:
types: [opened, synchronize]
jobs:
review:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Clone LYDOS Agent OS
run: git clone https://github.com/lydianai/AILYDIAN-AGENT-ORCHESTRATOR.git
- name: Run security scan
env:
LYDOS_API_KEY: ${{ secrets.LYDOS_API_KEY }}
run: |
python3 << 'EOF'
from lydos import LyDos
import json
import sys
client = LyDos()
pr_files = "${{ github.event.pull_request.title }}"
# Run security-scanner on changed files
result = client.agents.run(
"security-scanner",
task=f"Review PR changes for vulnerabilities",
params={"path": "./src", "depth": "standard"}
)
# Wait for completion
import time
while True:
task = client.tasks.get(result.task_id)
if task.status in ["completed", "failed"]:
break
time.sleep(2)
# Output results
print(f"::notice::LyDos Security Scan Complete")
print(task.result)
sys.exit(0 if task.status == "completed" else 1)
EOFExpected Outcome
- PRs blocked if HIGH/CRITICAL vulnerabilities found
- Detailed scan report posted as PR comment
- Security metrics tracked over time
- Developers notified immediately of issues
2. AI-Powered Research Newsletter
Generate a weekly market research newsletter using agents to research, analyze, and synthesize content.
Full Example
#!/usr/bin/env python3
"""
Weekly AI Research Newsletter Generator
Uses LyDos agents to research, analyze, and create newsletter content
"""
import time
from datetime import datetime
from lydos import LyDos
client = LyDos()
def generate_newsletter():
topics = [
"Latest developments in AI agents (2025)",
"Emerging cybersecurity threats",
"Machine learning optimization techniques",
]
print("Starting weekly research newsletter generation...")
print(f"Topics: {len(topics)} to research")
# Step 1: Research all topics
print("\nPhase 1: Deep Research")
research_tasks = {}
for topic in topics:
result = client.agents.run(
"deep-research",
task=f"Research {topic} and provide detailed insights",
params={"depth": "comprehensive", "sources": ["web", "academic-papers", "github"]}
)
research_tasks[topic] = result.task_id
print(f" • {topic}: {result.task_id}")
# Wait for research to complete
print("\nPhase 2: Collecting Results")
research_results = {}
for topic, task_id in research_tasks.items():
while True:
task = client.tasks.get(task_id)
if task.status == "completed":
research_results[topic] = task.result
print(f" ✓ {topic}")
break
elif task.status == "failed":
print(f" ✗ {topic}: {task.error_message}")
break
time.sleep(2)
# Step 2: Synthesize and write content
print("\nPhase 3: Content Generation")
synthesis_result = client.agents.run(
"documentation-writer",
task=f"Write a professional newsletter using this research: {str(research_results)}",
params={"format": "markdown", "style": "professional"}
)
while True:
task = client.tasks.get(synthesis_result.task_id)
if task.status in ["completed", "failed"]:
newsletter_content = task.result
break
time.sleep(2)
# Step 3: Generate summary for social media
print("\nPhase 4: Social Media Summary")
summary_result = client.agents.run(
"content-marketer",
task=f"Create a Twitter thread from this newsletter: {newsletter_content}",
params={"max_results": 5, "format": "twitter_thread"}
)
while True:
task = client.tasks.get(summary_result.task_id)
if task.status in ["completed", "failed"]:
social_content = task.result
break
time.sleep(2)
# Step 4: Save and publish
timestamp = datetime.now().isoformat()
newsletter = {
"timestamp": timestamp,
"topics": topics,
"newsletter_html": newsletter_content,
"social_media": social_content,
"research_tasks": research_tasks
}
# Save to file
import json
with open(f"newsletters/newsletter_{timestamp}.json", "w") as f:
json.dump(newsletter, f, indent=2)
# Publish (example: email, Medium, LinkedIn, etc.)
print(f"\n✓ Newsletter generated and saved")
print(f"Content length: {len(newsletter_content)} characters")
print(f"Social posts: {len(social_content.split('---'))} tweets")
return newsletter
if __name__ == "__main__":
newsletter = generate_newsletter()Expected Outcome
- Professional newsletter generated from 3 topics
- Takes ~5-10 minutes for comprehensive research
- Social media content auto-created for distribution
- Repeatable weekly with new topics
3. Self-Healing Infrastructure Monitoring
Monitor system health and automatically trigger remediation agents when issues are detected.
Architecture
Health Monitor (WebSocket) → Issue Detected → Remediation Agent
↓
Restart Service
Rebalance Load
Scale Deployment
Rollback ChangesImplementation
#!/usr/bin/env python3
import asyncio
import websockets
import json
from lydos import LyDos
client = LyDos()
async def monitor_infrastructure():
"""Monitor health and auto-remediate issues"""
async with websockets.connect(
'wss://lydos.ailydian.com/v1/stream',
extra_headers={'Authorization': f'Bearer {client.api_key}'}
) as ws:
while True:
msg = await ws.recv()
event = json.loads(msg)
if event['type'] == 'health':
health_score = event['score']
if health_score < 70:
print(f"⚠️ Health degraded: {health_score}/100")
await remediate(event)
async def remediate(event):
"""Automatically fix infrastructure issues"""
k8s_result = client.agents.run(
"k8s-specialist",
task="Diagnose cluster health and recommend fixes",
params={"cluster": "production", "depth": "comprehensive"}
)
# Wait for diagnosis
while True:
task = client.tasks.get(k8s_result.task_id)
if task.status == "completed":
diagnosis = task.result
break
await asyncio.sleep(2)
# Parse and apply fixes
if "restart" in diagnosis.lower():
print(" → Restarting unhealthy pods...")
# kubectl rollout restart...
if "scale" in diagnosis.lower():
print(" → Scaling deployment...")
# kubectl scale deployment...
print("✓ Remediation complete")Expected Outcome
- Real-time health monitoring with WebSocket streaming
- Automatic remediation when issues detected
- Reduced MTTR (Mean Time To Recovery)
- Fewer manual interventions and pages
4. Multi-Model AI Application (RAG Chatbot)
Build a RAG (Retrieval-Augmented Generation) chatbot using LyDos as the intelligent routing layer.
Architecture
User Query → LyDos Router Agent → Select Best Model/Tool
↓
Retrieve Context
Generate Response
Format Output
↓
User ResponseFull Implementation
#!/usr/bin/env python3
from lydos import LyDos
from typing import Optional
client = LyDos()
class RAGChatbot:
def __init__(self):
self.conversation_history = []
self.knowledge_base = self.load_knowledge_base()
def load_knowledge_base(self):
"""Load documents for RAG"""
# Load from database, documents, or web
return {
"products": "...",
"pricing": "...",
"docs": "..."
}
def query(self, user_input: str) -> str:
"""Process user query and return response"""
# Step 1: Determine intent
intent = self.determine_intent(user_input)
print(f"Detected intent: {intent}")
# Step 2: Retrieve relevant context
context = self.retrieve_context(user_input)
# Step 3: Route to appropriate agent based on intent
if intent == "technical_question":
response = client.agents.run(
"deep-research",
task=f"Answer: {user_input}\nContext: {context}",
params={"depth": "standard"}
)
elif intent == "product_help":
response = client.agents.run(
"documentation-writer",
task=f"Help with: {user_input}\nDocs: {context}",
params={"depth": "fast"}
)
else:
response = client.chat(
f"User question: {user_input}\nContext: {context}"
)
return response.text
# Wait for agent response
import time
while True:
task = client.tasks.get(response.task_id)
if task.status == "completed":
return task.result
elif task.status == "failed":
return f"Sorry, I couldn't process that: {task.error_message}"
time.sleep(1)
def determine_intent(self, user_input: str) -> str:
"""Classify user intent"""
keywords = {
"technical": ["API", "code", "error", "bug", "debug"],
"product": ["feature", "pricing", "plan", "purchase"],
"general": ["hello", "help", "info"]
}
for intent, terms in keywords.items():
if any(term.lower() in user_input.lower() for term in terms):
return intent
return "general"
def retrieve_context(self, query: str) -> str:
"""Retrieve relevant documents (simplified)"""
# In production, use vector DB for semantic search
return self.knowledge_base.get("docs", "")
# Usage
bot = RAGChatbot()
while True:
user_input = input("You: ")
if user_input.lower() == "quit":
break
response = bot.query(user_input)
print(f"Bot: {response}\n")Expected Outcome
- Intelligent chatbot with context-aware responses
- Automatic intent detection and routing
- Answers based on knowledge base + LLM
- Customizable for any domain (support, sales, tech)
5. Security Operations Center (SOC)
Real-time threat detection combining CTI feeds with agent-based analysis and automated response.
Workflow
# Continuous monitoring 1. Fetch CTI feeds (ransomware, CVEs, exploits) 2. Match against asset inventory 3. Run vulnerability-hunter agent for affected systems 4. If HIGH/CRITICAL: trigger incident response workflow 5. Notify security team and create incident ticket
Example: CVE Detection
#!/usr/bin/env python3
from lydos import LyDos
import schedule
import time
client = LyDos()
def check_vulnerabilities():
"""Daily vulnerability check against known CVEs"""
# Check for new CVEs and affected packages
result = client.agents.run(
"dependency-auditor",
task="Check all dependencies for known CVEs",
params={"path": "./", "check_cves": True}
)
# Wait for scan
while True:
task = client.tasks.get(result.task_id)
if task.status == "completed":
vulnerabilities = parse_vulnerabilities(task.result)
if vulnerabilities:
print(f"Found {len(vulnerabilities)} vulnerabilities!")
for vuln in vulnerabilities:
if vuln['severity'] in ['HIGH', 'CRITICAL']:
trigger_incident_response(vuln)
break
time.sleep(2)
def trigger_incident_response(vulnerability):
"""Auto-create incident and notify team"""
# Create incident in ticketing system
print(f"🚨 CRITICAL: {vulnerability['id']} detected!")
# Send alerts, create Jira ticket, etc.
# Schedule daily
schedule.every().day.at("01:00").do(check_vulnerabilities)
while True:
schedule.run_pending()
time.sleep(60)Expected Outcome
- Automated threat detection 24/7
- Rapid identification of affected systems
- Incident tickets auto-created for critical issues
- Security team notified in real-time
6. Autonomous SEO Optimization
Automatically run SEO audits and optimize content across your website.
Implementation
#!/usr/bin/env python3
from lydos import LyDos
import schedule
client = LyDos()
def weekly_seo_audit():
"""Run comprehensive SEO audit"""
result = client.agents.run(
"seo-specialist",
task="Perform technical SEO audit of website",
params={"domain": "example.com", "depth": "comprehensive"}
)
# Wait for audit
while True:
task = client.tasks.get(result.task_id)
if task.status == "completed":
audit_results = task.result
optimize_content(audit_results)
break
time.sleep(2)
def optimize_content(audit_results):
"""Update content based on SEO recommendations"""
# Parse audit findings
issues = parse_issues(audit_results)
for issue in issues:
if issue['type'] == 'title_too_short':
# Run content agent to improve titles
result = client.agents.run(
"content-marketer",
task=f"Improve page title for SEO: {issue['page']}",
params={"format": "seo", "max_length": 60}
)
elif issue['type'] == 'missing_meta':
# Generate meta descriptions
result = client.agents.run(
"content-marketer",
task=f"Write SEO meta description for: {issue['page']}",
params={"format": "meta", "max_length": 160}
)
# Schedule weekly
schedule.every().monday.at("02:00").do(weekly_seo_audit)
while True:
schedule.run_pending()Expected Outcome
- Weekly automated SEO audits
- Content recommendations prioritized by impact
- Improved search rankings over time
- Reduced manual SEO workload
Common Patterns
Error Handling
Always wrap task polling in try/except and implement timeout logic:
try:
while elapsed < timeout:
task = client.tasks.get(task_id)
if task.status == "completed":
return task.result
elif task.status == "failed":
raise Exception(task.error_message)
time.sleep(poll_interval)
except Exception as e:
log_error(f"Task failed: {e}")
fallback_action()Scalability
For high-volume workloads, use batch task submission and queuing:
# Queue tasks instead of sequential execution
task_ids = []
for item in items:
result = client.agents.run(agent_id, task=item)
task_ids.append(result.task_id)
# Poll all tasks in parallel
results = poll_many(task_ids)Learn More
Explore more agents and integrate them into your workflows:
- Agents Guide — All 109 agents explained
- API Reference — Detailed endpoint documentation
- SDK Reference — Python and TypeScript SDKs