Skip to main content
Glama
research_agent.py15.4 kB
"""자료조사 에이전트: 웹 검색과 벡터 DB 검색을 통해 정보를 수집하는 에이전트""" from typing import Any, ClassVar, List, Dict, Optional from datetime import datetime from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.runnables import RunnableConfig from langchain_core.messages import HumanMessage, AIMessage from langgraph.checkpoint.base import BaseCheckpointSaver from langgraph.graph import StateGraph, END from langgraph.store.base import BaseStore from pydantic import BaseModel, Field from agents.base import BaseAgent, BaseState class SearchResult(BaseModel): """검색 결과""" source: str = Field(description="출처 (web/vector)") title: str = Field(description="제목") content: str = Field(description="내용") url: Optional[str] = Field(default=None, description="URL (웹 검색인 경우)") relevance_score: float = Field(description="관련성 점수") metadata: Dict[str, Any] = Field(default_factory=dict, description="추가 메타데이터") class ResearchState(BaseState): """자료조사 에이전트의 상태""" research_query: str = Field(description="조사 주제/질문") search_keywords: List[str] = Field(default_factory=list, description="검색 키워드") web_results: List[SearchResult] = Field(default_factory=list, description="웹 검색 결과") vector_results: List[SearchResult] = Field(default_factory=list, description="벡터 검색 결과") aggregated_results: List[SearchResult] = Field(default_factory=list, description="통합 검색 결과") research_summary: str = Field(default="", description="조사 결과 요약") sources_cited: List[str] = Field(default_factory=list, description="인용된 출처") class ResearchAgent(BaseAgent): """자료조사 에이전트""" NODE_NAMES: ClassVar[dict[str, str]] = { "EXTRACT_KEYWORDS": "extract_keywords", "SEARCH_WEB": "search_web", "SEARCH_VECTOR": "search_vector", "AGGREGATE": "aggregate_results", "SUMMARIZE": "summarize_findings", } def __init__( self, model: BaseChatModel, state_schema: Any = ResearchState, config_schema: Any | None = None, input_schema: Any | None = None, output_schema: Any | None = None, checkpointer: BaseCheckpointSaver | None = None, store: BaseStore | None = None, max_retry_attempts: int = 2, agent_name: str = "ResearchAgent", is_debug: bool = True, mcp_client: Any = None, # MCP 클라이언트는 나중에 주입 ) -> None: super().__init__( model=model, state_schema=state_schema, config_schema=config_schema, input_schema=input_schema, output_schema=output_schema, checkpointer=checkpointer, store=store, max_retry_attempts=max_retry_attempts, agent_name=agent_name, is_debug=is_debug, ) self.mcp_client = mcp_client def init_nodes(self, graph: StateGraph): """그래프에 노드 초기화""" extract_node = self.get_node_name("EXTRACT_KEYWORDS") web_search_node = self.get_node_name("SEARCH_WEB") vector_search_node = self.get_node_name("SEARCH_VECTOR") aggregate_node = self.get_node_name("AGGREGATE") summarize_node = self.get_node_name("SUMMARIZE") graph.add_node(extract_node, self.extract_keywords) graph.add_node(web_search_node, self.search_web) graph.add_node(vector_search_node, self.search_vector) graph.add_node(aggregate_node, self.aggregate_results) graph.add_node(summarize_node, self.summarize_findings) def init_edges(self, graph: StateGraph): """그래프에 엣지 초기화""" extract_node = self.get_node_name("EXTRACT_KEYWORDS") web_search_node = self.get_node_name("SEARCH_WEB") vector_search_node = self.get_node_name("SEARCH_VECTOR") aggregate_node = self.get_node_name("AGGREGATE") summarize_node = self.get_node_name("SUMMARIZE") # 워크플로우 정의 graph.set_entry_point(extract_node) # 키워드 추출 후 병렬로 웹 검색과 벡터 검색 수행 graph.add_edge(extract_node, web_search_node) graph.add_edge(extract_node, vector_search_node) # 두 검색이 완료되면 결과 통합 graph.add_edge(web_search_node, aggregate_node) graph.add_edge(vector_search_node, aggregate_node) # 통합 후 요약 graph.add_edge(aggregate_node, summarize_node) graph.add_edge(summarize_node, END) async def extract_keywords(self, state: ResearchState, config: RunnableConfig) -> ResearchState: """검색 키워드 추출""" try: prompt = f"""다음 조사 주제에서 효과적인 검색을 위한 키워드를 추출하세요: 조사 주제: {state.research_query} 다음 사항을 고려하세요: 1. 핵심 개념과 용어 2. 관련 동의어나 유사어 3. 영어 키워드도 포함 (국제 자료 검색용) 4. 구체적이고 명확한 키워드 5-8개의 키워드를 쉼표로 구분하여 나열하세요.""" response = await self.model.ainvoke([HumanMessage(content=prompt)], config) # 키워드 파싱 keywords = [kw.strip() for kw in response.content.split(',')] state.search_keywords = keywords[:8] # 최대 8개로 제한 if self.is_debug: print(f"[{self.agent_name}] 키워드 추출 완료:") print(f" 키워드: {', '.join(state.search_keywords)}") return state except Exception as e: if self.is_debug: print(f"[{self.agent_name}] 키워드 추출 중 오류: {e}") raise e async def search_web(self, state: ResearchState, config: RunnableConfig) -> ResearchState: """웹 검색 수행""" try: # MCP 클라이언트가 없는 경우 모의 결과 생성 if self.mcp_client is None: # 실제 구현에서는 MCP 클라이언트를 통해 검색 if self.is_debug: print(f"[{self.agent_name}] MCP 클라이언트가 없어 모의 웹 검색 수행") # 모의 검색 결과 for i, keyword in enumerate(state.search_keywords[:3]): state.web_results.append( SearchResult( source="web", title=f"{keyword}에 대한 웹 검색 결과 {i+1}", content=f"{keyword}와 관련된 최신 정보입니다. 이는 모의 데이터입니다.", url=f"https://example.com/{keyword.replace(' ', '-')}-{i+1}", relevance_score=0.8 - (i * 0.1), metadata={"search_date": datetime.now().isoformat()} ) ) else: # 실제 MCP를 통한 웹 검색 # 키워드를 조합하여 하나의 통합 쿼리 생성 combined_query = " ".join(state.search_keywords[:3]) try: results = await self.mcp_client.search_web( query=combined_query, max_results=10, search_depth="basic" ) # 결과를 SearchResult 형태로 변환 for result in results: state.web_results.append( SearchResult( source="web", title=result.get("title", "제목 없음"), content=result.get("content", ""), url=result.get("url"), relevance_score=result.get("score", 0.5), metadata=result.get("metadata", {}) ) ) except Exception as mcp_error: if self.is_debug: print(f"[{self.agent_name}] MCP 웹 검색 오류: {mcp_error}") # 오류 발생 시 빈 결과 유지 if self.is_debug: print(f"[{self.agent_name}] 웹 검색 완료:") print(f" 결과 수: {len(state.web_results)}") return state except Exception as e: if self.is_debug: print(f"[{self.agent_name}] 웹 검색 중 오류: {e}") raise e async def search_vector(self, state: ResearchState, config: RunnableConfig) -> ResearchState: """벡터 DB 검색 수행""" try: # MCP 클라이언트가 없는 경우 모의 결과 생성 if self.mcp_client is None: if self.is_debug: print(f"[{self.agent_name}] MCP 클라이언트가 없어 모의 벡터 검색 수행") # 모의 검색 결과 for i, keyword in enumerate(state.search_keywords[:2]): state.vector_results.append( SearchResult( source="vector", title=f"{keyword}에 대한 문서 {i+1}", content=f"{keyword}와 관련된 저장된 문서 내용입니다. 이는 모의 데이터입니다.", relevance_score=0.9 - (i * 0.1), metadata={ "collection": "default", "document_id": f"doc_{i+1}", "created_at": datetime.now().isoformat() } ) ) else: # 실제 MCP를 통한 벡터 검색 # 키워드를 조합하여 검색 combined_query = " ".join(state.search_keywords[:3]) try: results = await self.mcp_client.search_vector( query=combined_query, collection="default", top_k=10 ) # 결과를 SearchResult 형태로 변환 for result in results: # 오류 결과는 건너뛰기 if "error" in result: continue state.vector_results.append( SearchResult( source="vector", title=result.get("title", "제목 없음"), content=result.get("content", ""), relevance_score=result.get("score", 0.5), metadata=result.get("metadata", {}) ) ) except Exception as mcp_error: if self.is_debug: print(f"[{self.agent_name}] MCP 벡터 검색 오류: {mcp_error}") # 오류 발생 시 빈 결과 유지 if self.is_debug: print(f"[{self.agent_name}] 벡터 검색 완료:") print(f" 결과 수: {len(state.vector_results)}") return state except Exception as e: if self.is_debug: print(f"[{self.agent_name}] 벡터 검색 중 오류: {e}") raise e async def aggregate_results(self, state: ResearchState, config: RunnableConfig) -> ResearchState: """검색 결과 통합 및 중복 제거""" try: # 모든 결과를 하나의 리스트로 합치기 all_results = state.web_results + state.vector_results # 관련성 점수로 정렬 all_results.sort(key=lambda x: x.relevance_score, reverse=True) # 중복 제거 (제목과 내용의 유사성 기반) seen_content = set() unique_results = [] for result in all_results: # 간단한 중복 체크 (실제로는 더 정교한 방법 필요) content_hash = hash(result.title + result.content[:100]) if content_hash not in seen_content: seen_content.add(content_hash) unique_results.append(result) # 상위 10개 결과만 유지 state.aggregated_results = unique_results[:10] # 출처 수집 state.sources_cited = list(set([ result.url or f"{result.source}:{result.title}" for result in state.aggregated_results ])) if self.is_debug: print(f"[{self.agent_name}] 결과 통합 완료:") print(f" 통합 결과 수: {len(state.aggregated_results)}") print(f" 인용 출처 수: {len(state.sources_cited)}") return state except Exception as e: if self.is_debug: print(f"[{self.agent_name}] 결과 통합 중 오류: {e}") raise e async def summarize_findings(self, state: ResearchState, config: RunnableConfig) -> ResearchState: """조사 결과 요약""" try: # 검색 결과를 문자열로 정리 results_text = "\n\n".join([ f"[{i+1}] {result.title}\n출처: {result.source}\n내용: {result.content[:200]}..." for i, result in enumerate(state.aggregated_results) ]) prompt = f"""다음 검색 결과를 바탕으로 '{state.research_query}'에 대한 종합적인 요약을 작성하세요: 검색 결과: {results_text} 요약 작성 시 다음 사항을 포함하세요: 1. 핵심 발견사항 2. 주요 트렌드나 패턴 3. 중요한 통계나 사실 4. 추가 조사가 필요한 부분 체계적이고 읽기 쉬운 형태로 작성하세요.""" response = await self.model.ainvoke([HumanMessage(content=prompt)], config) state.research_summary = response.content # 최종 메시지 추가 state.messages.append( AIMessage(content=f"조사 완료: {len(state.aggregated_results)}개의 관련 자료를 찾았습니다.\n\n{state.research_summary}") ) if self.is_debug: print(f"[{self.agent_name}] 요약 완료:") print(f" 요약 길이: {len(state.research_summary)} 문자") return state except Exception as e: if self.is_debug: print(f"[{self.agent_name}] 요약 생성 중 오류: {e}") raise e

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/HyunjunJeon/vibecoding-lg-mcp-a2a'

If you have feedback or need assistance with the MCP directory API, please join our Discord server