Imagine you’ve built a slick AI workflow with CrewAI—say, a team of agents that scrape job postings, craft tailored CVs, and email applications on your behalf. It works beautifully on your local machine, but then reality hits: you want to run this for dozens of job postings daily, across multiple users, without your laptop turning into a toaster or your code becoming a maintenance nightmare. How do you scale it? How do you manage it efficiently? That’s the problem we’re solving today.
In this blog, we’ll explore how integrating CrewAI with an MCP (ModelContextProtocol) server using the FastMCP class addresses the challenges of scalability, resource management, and workflow coordination in multi-agent AI systems. We’ll use a job application automation example to show how this setup takes your local crew and transforms it into a distributed powerhouse.
Let’s dive into the problem—and the solution!
The Problem: Scaling Multi-Agent AI Workflows
CrewAI is fantastic for orchestrating AI agents to tackle complex tasks collaboratively. But when you try to scale it up, you hit these roadblocks:
- Resource Overload: Running multiple crews locally eats up CPU, memory, and time—especially for tasks like web scraping or email automation across many inputs.
- Management Chaos: Tracking the status of dozens of crew tasks (e.g., “Did that job application send?”) becomes a logistical headache without a centralized system.
- Deployment Limits: Local execution isn’t practical for production use—think enterprise apps or multi-user setups where reliability and uptime matter.
- Security Risks: Hardcoding credentials or running sensitive tasks locally exposes you to vulnerabilities.
The result? Your brilliant AI workflow is stuck in a sandbox, unable to grow or operate efficiently beyond a single machine.
The Solution: CrewAI + MCP Server Integration
By integrating CrewAI with an MCP server via FastMCP, we offload the heavy lifting to a remote CrewAI Enterprise Server. This solves our problems by:
- Scaling Efficiently: The server handles resource-intensive tasks, freeing your local machine.
- Centralized Management: API endpoints let you kick off tasks and check their status from anywhere, no babysitting required.
- Production-Ready Deployment: A server-based approach supports multiple users and continuous operation.
- Enhanced Security: Credentials and execution are managed server-side, reducing local exposure.
Let’s see this in action with our job application automation example.
What You’ll Need
- Python 3.8+
- CrewAI: pip install crewai
- MCP Library: pip install mcp (assumes availability)
- HTTP Client: pip install httpx
- Environment Variables: pip install python-dotenv
- CrewAI Enterprise Server: A running instance with API access
- A .env File: For credentials
Step 1: Set Up Your Environment
Create a .env file to securely store your server details:
MCP_CREWAI_ENTERPRISE_SERVER_URL=https://your-crewai-server.com/api
MCP_CREWAI_ENTERPRISE_BEARER_TOKEN=your-secret-token-here
Install the dependencies:
pip install crewai mcp httpx python-dotenv
Step 2: Define the Local CrewAI Workflow
Our example crew automates job applications with three agents: a researcher (scrapes job details), a CV generator, and an email sender. Here’s the local setup:
from crewai import Agent, Task, Crew, Process
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManage
from crewai_tools import SerperDevTool
def extract_email_and_job_description(url):
"""Scrape email and job description from a given URL."""
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-blink-features=AutomationControlled")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
try:
driver.get(url)
time.sleep(3)
soup = BeautifulSoup(driver.page_source, 'html.parser')
# Extract email
email = re.search(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', soup.text)
email = email.group(0) if email else None
# Extract job description
job_description = ""
for tag in ['div', 'section', 'p']:
desc = soup.find(tag, class_=re.compile('description|job|about|content', re.I))
if desc and desc.text.strip():
job_description = desc.text.strip()
break
if not job_description:
paragraphs = soup.find_all('p')
for p in paragraphs:
if len(p.text.strip()) > 100:
job_description = p.text.strip()
break
# Fallback with Serper for email
if not email:
try:
company_name = soup.find('title').text.split('|')[0].strip() if soup.find('title') else url.split('/')[2]
search_result = serper_tool.search(f"{company_name} contact email")
if search_result and 'results' in search_result:
for result in search_result['results']:
email_match = re.search(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', result.get('snippet', ''))
if email_match:
email = email_match.group(0)
break
except Exception as e:
print(f"Serper search failed: {e}")
return email if email else "Not found", job_description if job_description else "Not available"
finally:
driver.quit()
def send_email(to_email, subject, body):
"""Send an email via Gmail API."""
if not GMAIL_TOKEN_PATH or not os.path.exists(GMAIL_TOKEN_PATH):
raise FileNotFoundError(f"GMAIL_TOKEN_PATH not set or token.json not found at {GMAIL_TOKEN_PATH}")
creds = Credentials.from_authorized_user_file(
GMAIL_TOKEN_PATH,
scopes=['https://www.googleapis.com/auth/gmail.send']
)
service = build('gmail', 'v1', credentials=creds)
message = f"To: {to_email}\nSubject: {subject}\n\n{body}"
encoded_message = base64.urlsafe_b64encode(message.encode()).decode()
email_msg = {'raw': encoded_message}
try:
service.users().messages().send(userId='me', body=email_msg).execute()
return "Email sent successfully"
except Exception as e:
return f"Failed to send email: {e}"
serper_tool = SerperDevTool()
# Agents
researcher = Agent(
role='Researcher',
goal='Extract email and job details from a URL',
tools=[extract_email_and_job_description, serper_tool],
verbose=True
)
cv_generator = Agent(
role='CV Generator',
goal='Generate a tailored CV',
verbose=True
)
email_agent = Agent(
role='Email Agent',
goal='Send the job application',
tools=[send_email],
verbose=True
)
# Tasks
research_task = Task(description='Extract from {initial_url}', agent=researcher)
cv_task = Task(description='Create CV for "Your Name" using {job_description}', agent=cv_generator)
email_task = Task(description='Send email to {email}', agent=email_agent)
# Crew
crew = Crew(
agents=[researcher, cv_generator, email_agent],
tasks=[research_task, cv_task, email_task],
process=Process.sequential,
verbose=True
)
This works fine for one job—but try running it for 50 jobs locally, and you’ll feel the strain.
Step 3: Integrate with MCP to Solve the Scaling Problem
Here’s where FastMCP comes in. We’ll define tools to offload this crew to a CrewAI Enterprise Server, solving our scalability and management issues:
import dotenv
import os
import httpx
from mcp.server.fastmcp import FastMCP
from typing import Any
# Load credentials
dotenv.load_dotenv()
mcp = FastMCP("crewai_enterprise_server")
CREWAI_ENTERPRISE_SERVER_URL = os.getenv("MCP_CREWAI_ENTERPRISE_SERVER_URL")
CREWAI_ENTERPRISE_BEARER_TOKEN = os.getenv("MCP_CREWAI_ENTERPRISE_BEARER_TOKEN")
crew = Crew(
agents=[researcher, cv_generator, email_agent],
tasks=[research_task, cv_task, email_task],
process=Process.sequential,
verbose=True
)
# Tool to Kickoff Crew Task
@mcp.tool()
async def kickoff_crew(inputs: dict[str, Any]) -> dict[str, Any]:
"""Start a crew task on the server, solving local resource overload.
Args:
inputs: Dictionary with initial_url and name.
Returns:
Crew task response with crew_id.
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{CREWAI_ENTERPRISE_SERVER_URL}/kickoff",
headers={
"Authorization": f"Bearer {CREWAI_ENTERPRISE_BEARER_TOKEN}",
"Content-Type": "application/json",
},
json={
"inputs": inputs,
"crew": crew.to_dict() # Offload to server
},
)
return response.json()
# Tool to Check Status
@mcp.tool()
async def get_crew_status(crew_id: str) -> dict[str, Any]:
"""Track crew task status, solving management chaos.
Args:
crew_id: The task ID.
Returns:
Task status.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{CREWAI_ENTERPRISE_SERVER_URL}/status/{crew_id}",
headers={
"Authorization": f"Bearer {CREWAI_ENTERPRISE_BEARER_TOKEN}",
"Content-Type": "application/json",
},
)
return response.json()
# Run the MCP Server
if __name__ == "__main__":
import mcp
mcp.run()
How Does This Solves the Problem?
- Scalability: The server handles the crew’s execution, so your local machine stays cool and quiet.
- Management: The crew_id lets you track tasks remotely—no more guessing what’s running.
- Deployment: This setup is ready for production, supporting multiple users or jobs.
- Security: Credentials are centralized and encrypted via the server.
Step 4: Test the Solution
- Run the MCP Server:
python script.py
- Kick Off a Task:
curl -X POST http://localhost:port/kickoff_crew \
-H “Content-Type: application/json” \
-d ‘{“inputs”: {“initial_url”: “https://example.com/job”, “name”: “Your Name”}}’
Get a crew_id back (e.g., “abc123”).
- Check Status:
curl http://localhost:port/get_crew_status/abc123
Now, apply to 50 jobs—or 500—without breaking a sweat!
Step 5: Enhance and Deploy
- Error Handling: Add retries for failed requests.
- Customization: Tweak the crew payload if your server expects a different format.
- Cloud Deployment: Host the MCP server on AWS or GCP for 24/7 access.
Conclusion
The problem of scaling and managing CrewAI workflows is real—but it’s solvable. By integrating with an MCP server using FastMCP, you transform a local bottleneck into a distributed, manageable system. Our job application crew went from a resource-hogging script to a server-powered automation tool, ready to tackle big workloads with ease.
Give it a shot, and let us know how it solves your AI scaling challenges in the comments!