Skip to main content
Glama
by DemoDaygit
MVP_WEEK_PLAN.md25.3 kB
# MVP План на неделю (40-50 часов) ## 🎯 Цель MVP Система должна: - ✅ Запускаться и работать стабильно - ✅ Сохранять данные в БД (не терять при перезапуске) - ✅ Выполнять базовые задачи через агентов - ✅ Иметь работающий MCP протокол для tools/list и tools/call - ✅ Иметь базовую безопасность ## ❌ Что НЕ входит в MVP (отложить) - Полная интеграция с LLM (оставить симуляцию) - Меж-агентная коммуникация через message bus - Frontend интеграция (оставить моки) - Система миграций (использовать init-db.sql) - Все MCP методы (только tools/list и tools/call) - Сложная оркестрация задач --- ## 📅 Распределение по дням ### День 1-2 (16 часов): Инфраструктура данных **Цель:** Данные сохраняются в БД, не теряются при перезапуске ### День 3-4 (16 часов): Базовый функционал **Цель:** Система может создавать задачи и выполнять инструменты ### День 5 (8 часов): Безопасность и MCP протокол **Цель:** Базовая аутентификация и работающий MCP endpoint ### День 6-7 (8-10 часов): Тестирование и фиксы **Цель:** Все работает стабильно, нет критических багов --- ## 🔴 КРИТИЧНО ДЛЯ MVP (День 1-2) ### Задача 1: Подключение к PostgreSQL (6 часов) #### Что нужно: 1. Модуль подключения к БД 2. Минимальные модели (только для tasks и tools) 3. Обновить TaskOrchestrator для сохранения в БД #### Файлы для создания/изменения: **1. `mcp_server/app/core/database.py`** (НОВЫЙ) ```python from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import declarative_base from app.config import settings import structlog logger = structlog.get_logger() Base = declarative_base() engine = create_async_engine( settings.database_url, echo=False, pool_pre_ping=True, pool_size=5, max_overflow=10 ) AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) async def get_db() -> AsyncSession: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() async def init_db(): """Инициализация - таблицы уже созданы через init-db.sql""" # Просто проверяем подключение async with engine.begin() as conn: await conn.execute("SELECT 1") logger.info("Database connection verified") async def close_db(): await engine.dispose() logger.info("Database connections closed") ``` **2. `mcp_server/app/core/models/db_models.py`** (НОВЫЙ - минимальные модели) ```python from sqlalchemy import Column, String, Integer, DateTime, Text, JSON, ForeignKey from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.sql import func from app.core.database import Base import uuid class TaskModel(Base): __tablename__ = "tasks" __table_args__ = {"schema": "agents"} id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) title = Column(String(500), nullable=False) description = Column(Text) domain = Column(String(100), nullable=False) priority = Column(String(50), default='medium') status = Column(String(50), default='pending') agent_id = Column(UUID(as_uuid=True)) input_data = Column(JSON, default={}) output_data = Column(JSON) error_message = Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) started_at = Column(DateTime(timezone=True)) completed_at = Column(DateTime(timezone=True)) class ToolModel(Base): __tablename__ = "tools" __table_args__ = {"schema": "mcp"} id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(255), unique=True, nullable=False) description = Column(Text) input_schema = Column(JSON, nullable=False, default={}) status = Column(String(50), default='active') category = Column(String(100)) ``` **3. Обновить `mcp_server/app/core/services/task_orchestrator.py`** ```python from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.models.db_models import TaskModel from app.core.models.mcp_protocol import BusinessTask, TaskRequest, TaskResponse from datetime import datetime, timedelta class TaskOrchestrator: def __init__(self, db: AsyncSession): self.db = db self.agent_registry = AgentRegistry() async def create_task(self, request: TaskRequest) -> TaskResponse: # Создать в БД db_task = TaskModel( title=request.title, description=request.description, domain=request.domain, priority=request.priority, input_data=request.input_data, status="pending" ) self.db.add(db_task) await self.db.flush() # Получить ID # Найти агента (упрощенная логика) agent_id = await self._find_suitable_agent(db_task) if agent_id: db_task.agent_id = agent_id db_task.status = "processing" db_task.started_at = datetime.utcnow() await self.db.commit() return TaskResponse( task_id=str(db_task.id), status=db_task.status, agent_id=str(agent_id) if agent_id else None ) async def get_task(self, task_id: str) -> Optional[BusinessTask]: result = await self.db.execute( select(TaskModel).where(TaskModel.id == task_id) ) db_task = result.scalar_one_or_none() if not db_task: return None return BusinessTask( id=str(db_task.id), title=db_task.title, description=db_task.description, domain=db_task.domain, priority=db_task.priority, status=db_task.status, agent_id=str(db_task.agent_id) if db_task.agent_id else None, input_data=db_task.input_data, output_data=db_task.output_data, created_at=db_task.created_at, started_at=db_task.started_at, completed_at=db_task.completed_at, error_message=db_task.error_message ) async def list_tasks(self, status: str = None, domain: str = None, limit: int = 50, offset: int = 0) -> List[BusinessTask]: query = select(TaskModel) if status: query = query.where(TaskModel.status == status) if domain: query = query.where(TaskModel.domain == domain) query = query.order_by(TaskModel.created_at.desc()).limit(limit).offset(offset) result = await self.db.execute(query) db_tasks = result.scalars().all() return [ BusinessTask( id=str(t.id), title=t.title, description=t.description, domain=t.domain, priority=t.priority, status=t.status, agent_id=str(t.agent_id) if t.agent_id else None, input_data=t.input_data, output_data=t.output_data, created_at=t.created_at, started_at=t.started_at, completed_at=t.completed_at, error_message=t.error_message ) for t in db_tasks ] ``` **4. Обновить `mcp_server/app/main.py`** ```python from app.core.database import init_db, close_db @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Starting MCP Business AI Server", version=settings.app_version) try: await init_db() logger.info("Services initialized successfully") except Exception as e: logger.error("Failed to initialize services", error=str(e)) raise yield await close_db() logger.info("Shutting down MCP Business AI Server") ``` **5. Обновить `mcp_server/app/api/v1/resources.py`** (использовать dependency injection) ```python from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.core.services.task_orchestrator import TaskOrchestrator @router.post("/tasks", response_model=TaskResponse) async def create_task( request: TaskRequest, db: AsyncSession = Depends(get_db) ): orchestrator = TaskOrchestrator(db) return await orchestrator.create_task(request) @router.get("/tasks/{task_id}", response_model=BusinessTask) async def get_task( task_id: str, db: AsyncSession = Depends(get_db) ): orchestrator = TaskOrchestrator(db) task = await orchestrator.get_task(task_id) if not task: raise HTTPException(status_code=404, detail="Task not found") return task ``` **Время:** 6 часов --- ### Задача 2: Подключение к Redis для rate limiting (4 часа) **1. `mcp_server/app/core/redis_client.py`** (НОВЫЙ - упрощенная версия) ```python import redis.asyncio as redis from app.config import settings import structlog logger = structlog.get_logger() class RedisClient: def __init__(self): self.client: redis.Redis = None async def connect(self): try: self.client = await redis.from_url( settings.redis_url, encoding="utf-8", decode_responses=True ) await self.client.ping() logger.info("Redis connected") except Exception as e: logger.error("Redis connection failed", error=str(e)) raise async def disconnect(self): if self.client: await self.client.close() async def zadd(self, key: str, score: float, member: str): await self.client.zadd(key, {member: score}) async def zremrangebyscore(self, key: str, min_score: float, max_score: float): await self.client.zremrangebyscore(key, min_score, max_score) async def zcard(self, key: str) -> int: return await self.client.zcard(key) async def expire(self, key: str, seconds: int): await self.client.expire(key, seconds) redis_client = RedisClient() ``` **2. Обновить `mcp_server/app/middleware/rate_limiting.py`** ```python from app.core.redis_client import redis_client import time class RateLimitingMiddleware: async def __call__(self, request: Request, call_next): client_id = self._get_client_id(request) if not await self._check_rate_limit(client_id): raise HTTPException(status_code=429, detail="Rate limit exceeded") await self._record_request(client_id) return await call_next(request) async def _check_rate_limit(self, client_id: str) -> bool: now = time.time() window_start = now - settings.rate_limit_window key = f"rate_limit:{client_id}" try: await redis_client.zremrangebyscore(key, 0, window_start) count = await redis_client.zcard(key) return count < settings.rate_limit_requests except Exception as e: logger.error("Rate limit check failed", error=str(e)) return True # Fail open async def _record_request(self, client_id: str): now = time.time() key = f"rate_limit:{client_id}" try: await redis_client.zadd(key, now, str(now)) await redis_client.expire(key, settings.rate_limit_window) except Exception as e: logger.error("Rate limit record failed", error=str(e)) ``` **3. Обновить `mcp_server/app/main.py`** ```python from app.core.redis_client import redis_client @asynccontextmanager async def lifespan(app: FastAPI): try: await init_db() await redis_client.connect() logger.info("Services initialized") except Exception as e: logger.error("Failed to initialize", error=str(e)) raise yield await redis_client.disconnect() await close_db() ``` **Время:** 4 часа --- ### Задача 3: Обновить config.py для валидации (1 час) **Обновить `mcp_server/app/config.py`** ```python class Settings(BaseSettings): secret_key: str # Обязательное поле def __init__(self, **kwargs): super().__init__(**kwargs) if not self.secret_key or self.secret_key == "your-secret-key-change-in-production": raise ValueError("SECRET_KEY must be set in .env file") ``` **Время:** 1 час --- ### Задача 4: Обновить health check (1 час) **Обновить `mcp_server/app/api/v1/health.py`** ```python from app.core.redis_client import redis_client from app.core.database import engine @router.get("/") async def health_check(): services = {} # Check Database try: async with engine.begin() as conn: await conn.execute("SELECT 1") services["database"] = True except: services["database"] = False # Check Redis try: await redis_client.client.ping() services["redis"] = True except: services["redis"] = False status = "healthy" if all(services.values()) else "degraded" return { "status": status, "services": services, "timestamp": datetime.utcnow().isoformat() } ``` **Время:** 1 час **ИТОГО День 1-2:** 12 часов --- ## 🟠 ВЫСОКИЙ ПРИОРИТЕТ (День 3-4) ### Задача 5: Реализовать MCP tools/list и tools/call (6 часов) **1. Обновить `mcp_server/app/core/services/tool_registry.py`** (сохранять в БД) ```python from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.models.db_models import ToolModel class ToolRegistry: def __init__(self, db: AsyncSession): self.db = db async def get_all_tools(self) -> Dict[str, MCPTool]: result = await self.db.execute(select(ToolModel)) tools = result.scalars().all() return { tool.name: MCPTool( name=tool.name, description=tool.description, input_schema=tool.input_schema, status=ToolStatus(tool.status), category=tool.category, tags=tool.tags or [] ) for tool in tools } async def get_tool(self, tool_name: str) -> Optional[MCPTool]: result = await self.db.execute( select(ToolModel).where(ToolModel.name == tool_name) ) tool = result.scalar_one_or_none() if not tool: return None return MCPTool( name=tool.name, description=tool.description, input_schema=tool.input_schema, status=ToolStatus(tool.status), category=tool.category, tags=tool.tags or [] ) async def execute_tool(self, tool_name: str, parameters: Dict[str, Any], agent_id: Optional[str] = None) -> ToolExecution: # Получить tool из БД tool = await self.get_tool(tool_name) if not tool: raise ToolExecutionException(tool_name, "Tool not found") # Выполнить (оставить симуляцию для MVP) start_time = time.time() try: result = await self._execute_tool_implementation(tool, parameters) execution_time = time.time() - start_time # Сохранить в БД (если есть таблица tool_executions) # Пока просто возвращаем return ToolExecution( tool_name=tool_name, agent_id=agent_id or "system", parameters=parameters, result=result, execution_time=execution_time ) except Exception as e: return ToolExecution( tool_name=tool_name, agent_id=agent_id or "system", parameters=parameters, error=str(e), execution_time=time.time() - start_time ) ``` **2. Обновить `mcp_server/app/main.py`** (MCP handlers) ```python async def handle_tools_list(message: Dict[str, Any], correlation_id: str): from app.core.services.tool_registry import ToolRegistry from app.core.database import get_db async for db in get_db(): tool_registry = ToolRegistry(db) tools = await tool_registry.get_all_tools() mcp_tools = [ { "name": tool.name, "description": tool.description, "inputSchema": tool.input_schema } for tool in tools.values() ] return { "jsonrpc": "2.0", "id": message.get("id"), "result": {"tools": mcp_tools} } async def handle_tools_call(message: Dict[str, Any], correlation_id: str): from app.core.services.tool_registry import ToolRegistry from app.core.database import get_db params = message.get("params", {}) tool_name = params.get("name") arguments = params.get("arguments", {}) if not tool_name: return { "jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32602, "message": "Tool name required"} } async for db in get_db(): tool_registry = ToolRegistry(db) try: execution = await tool_registry.execute_tool(tool_name, arguments) return { "jsonrpc": "2.0", "id": message.get("id"), "result": { "content": [{"type": "text", "text": str(execution.result)}], "isError": execution.error is not None } } except Exception as e: return { "jsonrpc": "2.0", "id": message.get("id"), "error": {"code": -32603, "message": str(e)} } ``` **Время:** 6 часов --- ### Задача 6: Исправить Agent Registry (2 часа) **Обновить `mcp_server/app/core/services/agent_registry.py`** ```python async def create_agent(self, request: AgentRequest) -> Agent: async with self._lock: if request.type not in self.agent_types: raise ValueError(f"Unknown agent type: {request.type}") agent_id = str(uuid.uuid4()) # Генерировать ДО создания agent_class = self.agent_types[request.type] agent = agent_class( agent_id=agent_id, # Передать ID name=request.name, agent_type=request.type, capabilities=request.capabilities, config=request.config ) self.agents[agent.id] = agent await agent.start() return Agent( id=agent.id, name=agent.name, type=agent.type, description=request.description, capabilities=agent.capabilities, status=agent.status, tasks_completed=agent.tasks_completed, last_activity=agent.last_activity, config=agent.config ) async def get_agent(self, agent_id: str) -> Optional[Agent]: async with self._lock: agent = self.agents.get(agent_id) if not agent: return None current_task_id = str(agent.current_task.id) if agent.current_task else None # Безопасно return Agent( id=agent.id, name=agent.name, type=agent.type, description="", capabilities=agent.capabilities, status=agent.status, current_task_id=current_task_id, tasks_completed=agent.tasks_completed, last_activity=agent.last_activity, config=agent.config ) ``` **Время:** 2 часа --- ### Задача 7: Базовая безопасность API ключей (2 часа) **1. Обновить `mcp_server/app/middleware/auth.py`** ```python from passlib.context import CryptContext import hashlib pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class AuthMiddleware: async def _verify_api_key(self, api_key: str) -> bool: # Для MVP: проверка против переменной окружения # В production - проверка в БД valid_key = os.getenv("API_KEY") if not valid_key: # Если нет API_KEY в env, разрешить для разработки return True # Простое сравнение (для MVP) return api_key == valid_key ``` **2. Обновить `.env.example`** ```env API_KEY=your-api-key-here SECRET_KEY=your-secret-key-here ``` **Время:** 2 часа **ИТОГО День 3-4:** 10 часов --- ## 🟡 СРЕДНИЙ ПРИОРИТЕТ (День 5) ### Задача 8: Тестирование и фиксы (8 часов) 1. Протестировать все endpoints 2. Проверить сохранение данных в БД 3. Проверить rate limiting 4. Исправить найденные баги 5. Обновить документацию **Время:** 8 часов --- ## 📋 Чеклист выполнения ### День 1-2 (12 часов) - [ ] Создать `database.py` - [ ] Создать `db_models.py` (минимальные модели) - [ ] Обновить `task_orchestrator.py` для БД - [ ] Обновить `resources.py` для dependency injection - [ ] Создать `redis_client.py` - [ ] Обновить `rate_limiting.py` для Redis - [ ] Обновить `main.py` для инициализации - [ ] Обновить `config.py` для валидации - [ ] Обновить `health.py` для реальных проверок ### День 3-4 (10 часов) - [ ] Обновить `tool_registry.py` для работы с БД - [ ] Реализовать `handle_tools_list` в `main.py` - [ ] Реализовать `handle_tools_call` в `main.py` - [ ] Исправить `agent_registry.py` (ID и current_task) - [ ] Обновить `auth.py` для API ключей ### День 5 (8 часов) - [ ] Тестирование всех endpoints - [ ] Проверка сохранения данных - [ ] Исправление багов - [ ] Обновление документации --- ## 🚀 Быстрый старт ### 1. Создать .env ```env DATABASE_URL=postgresql+asyncpg://postgres:password@localhost:5432/mcp_db REDIS_URL=redis://localhost:6379 SECRET_KEY=$(python -c "import secrets; print(secrets.token_urlsafe(32))") API_KEY=mcp-api-key-12345 ``` ### 2. Запустить инфраструктуру ```bash docker-compose up -d postgres redis ``` ### 3. Установить зависимости ```bash cd mcp_server pip install -r requirements.txt ``` ### 4. Запустить сервер ```bash uvicorn app.main:app --reload ``` ### 5. Проверить ```bash curl http://localhost:8000/api/v1/health curl http://localhost:8000/api/v1/tools ``` --- ## ⚠️ Важные замечания 1. **Для MVP оставляем симуляцию LLM** - реальная интеграция займет больше времени 2. **Frontend остается с моками** - интеграция не критична для MVP 3. **Используем init-db.sql** - миграции отложены 4. **Упрощенная аутентификация** - для production нужна БД с API ключами --- ## 📊 Итоговая оценка - **День 1-2:** 12 часов - **День 3-4:** 10 часов - **День 5:** 8 часов - **Резерв:** 2 часа **ИТОГО: 32 часа** (4 рабочих дня по 8 часов) Это реалистично для недели работы!

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DemoDaygit/mcp-biz'

If you have feedback or need assistance with the MCP directory API, please join our Discord server