#langgraph imports from langgraph.graph import StateGraph, END #pip install langgraph from typing import TypedDict, Annotated class AgentState(TypedDict): user_message : str memory_context: str file_data: dict decision: str response: str needs_refinement: bool attempt_count: int def analyze_query_node(state: AgentState) -> AgentState: """Decide what action to take based on query and context""" user_message = state["user_message"] file_data = state.get("file_data") # Simple decision logic if file_data and file_data.get("content"): decision = "analyze_file" elif any(word in user_message.lower() for word in ["latest", "recent", "current", "news", "today"]): decision = "web_search" else: decision = "direct_answer" state["decision"] = decision state["attempt_count"] = state.get("attempt_count", 0) return state # Node 2: Generate response based on decision def generate_response_node(state: AgentState) -> AgentState: """Execute the decided action and generate response""" decision = state["decision"] user_message = state["user_message"] memory_context = state["memory_context"] file_data = state.get("file_data") if decision == "analyze_file" and file_data: filename = file_data.get("filename", "document") content = file_data.get("content", "")[:8000] SYSTEM_INSTRUCTION = f"""You are a helpful AI assistant analyzing a document. The user has uploaded a file named: "{filename}" DOCUMENT CONTENT: {content} MEMORY CONTEXT: {memory_context} USER QUERY: {user_message} IMPORTANT INSTRUCTIONS: - The user is asking about THIS specific document they uploaded - Answer questions directly about the content you see above - Be specific and reference actual content from the document - If asked "tell me about this document/file/policy", provide a comprehensive summary - Do not say you don't have access to the file - you can see its content above - Be clear, accurate, and helpful""" response = chat( model="llama3.2", messages=[{"role": "system", "content": SYSTEM_INSTRUCTION}, {"role": "user", "content": user_message}] ) state["response"] = response["message"]["content"] elif decision == "web_search": # Use existing web search agent search_tool = DuckDuckGoSearchRun() llm = ChatOllama(model="llama3.2", temperature=0.1) agent_executor = create_agent(llm, [search_tool]) for chunk in agent_executor.stream( {"messages": [HumanMessage(content=user_message)]}, stream_mode="values" ): latest = chunk["messages"][-1] if latest.type == "ai" and not getattr(latest, 'tool_calls', None): state["response"] = latest.content break else: # Direct answer SYSTEM_INSTRUCTION = f""" You are a highly flexible and helpful AI research assistant. Your primary goal is to answer the user's question, using the available tools only when necessary. **MEMORY CONTEXT:** {memory_context} **FLEXIBLE FORMATTING RULES:** 1. **PRIORITY:** Adhere strictly to any formatting requested by the user (e.g., "return as bullet points," "write a short paragraph," or "output as JSON"). 2. **LINK MANDATE (Hard Rule):** For every piece of factual information, movie, book, or specific data point you mention that came from the web search tool, you MUST embed the source URL from the search results directly into the text using **Markdown link syntax: [Relevant Text](Source URL)**. """ response = chat( model="llama3.2", messages=[{"role": "user", "content": SYSTEM_INSTRUCTION}] ) state["response"] = response["message"]["content"] return state # Node 3: Validate response quality def validate_response_node(state: AgentState) -> AgentState: """Check if response is good enough""" response = state.get("response", "") attempt_count = state.get("attempt_count", 0) # Simple validation: check length and attempt count if len(response.strip()) < 30 and attempt_count < 2: state["needs_refinement"] = True else: state["needs_refinement"] = False return state #Node 4: Refine response if needed def refine_response_node(state: AgentState) -> AgentState: """Improve the response""" previous = state["response"] user_message = state["user_message"] prompt = f"""The previous answer was too brief: {previous} User question: {user_message} Provide a more detailed and helpful answer.""" response = chat( model="llama3.2", messages=[{"role": "user", "content": prompt}] ) state["response"] = response["message"]["content"] state["attempt_count"] = state.get("attempt_count", 0) + 1 return state # Conditional edge: decide if we need refinement def should_refine(state: AgentState) -> str: """Route to refinement or end""" if state.get("needs_refinement", False): return "refine" return "end" def create_agent_graph(): """Create langgraph workflow""" workflow = StateGraph(AgentState) workflow.add_node('analyze',analyze_query_node) workflow.add_node('generate',generate_response_node) workflow.add_node('validate',validate_response_node) workflow.add_node('refine',refine_response_node) #add edges workflow.set_entry_point('analyze') workflow.add_edge('analyze','generate') workflow.add_edge('generate','validate') #conditional edge: refine or end workflow.add_conditional_edges( 'validate', should_refine, { 'refine':'refine', 'end': END } ) #after refine, validate again workflow.add_edge('refine','validate') return workflow.compile() #global initialization agent_graph = create_agent_graph() def run_internet_agent(user_message, memory_context, file_data=None): # """ # MODIFIED: Now accepts file_data dictionary with filename and content # If file exists, agent answers based on the file # Otherwise, uses web search as before # """ # # CHANGE: If file data exists, answer from file instead of searching # if file_data and file_data.get("content"): # filename = file_data.get("filename", "uploaded document") # file_content = file_data.get("content", "") # # Truncate if too long but keep reasonable context # max_length = 8000 # Increased from 3000 # if len(file_content) > max_length: # file_content = file_content[:max_length] + "\n\n[Document truncated for length...]" # SYSTEM_INSTRUCTION = f"""You are a helpful AI assistant analyzing a document. # The user has uploaded a file named: "{filename}" # DOCUMENT CONTENT: # {file_content} # MEMORY CONTEXT: # {memory_context} # IMPORTANT INSTRUCTIONS: # - The user is asking about THIS specific document they uploaded # - Answer questions directly about the content you see above # - Be specific and reference actual content from the document # - If asked "tell me about this document/file/policy", provide a comprehensive summary # - Do not say you don't have access to the file - you can see its content above # - Be clear, accurate, and helpful""" # # Use LLM directly without web search for file analysis # try: # response = chat( # model="llama3.2", # messages=[ # {"role": "system", "content": SYSTEM_INSTRUCTION}, # {"role": "user", "content": user_message} # ] # ) # return response["message"]["content"] # except Exception as e: # return f"Error analyzing file: {str(e)}" # # CHANGE: Original web search logic if no file # else: # SYSTEM_INSTRUCTION = f""" # You are a highly flexible and helpful AI research assistant. Your primary goal is to answer the user's question, using the available tools only when necessary. # **MEMORY CONTEXT:** # {memory_context} # **FLEXIBLE FORMATTING RULES:** # 1. **PRIORITY:** Adhere strictly to any formatting requested by the user (e.g., "return as bullet points," "write a short paragraph," or "output as JSON"). # 2. **LINK MANDATE (Hard Rule):** For every piece of factual information, movie, book, or specific data point you mention that came from the web search tool, you MUST embed the source URL from the search results directly into the text using **Markdown link syntax: [Relevant Text](Source URL)**. # """ # search_tool = DuckDuckGoSearchRun() # tools = [search_tool] # llm = ChatOllama( # model="llama3.2", # temperature=0.1, # ) # agent_executor = create_agent(llm, tools, system_prompt=SYSTEM_INSTRUCTION) # final_response = None # try: # for chunk in agent_executor.stream( # {"messages": [HumanMessage(content=user_message)]}, # stream_mode="values" # ): # latest_message = chunk["messages"][-1] # # Check message type # if latest_message.type == "ai": # # Check if this is the final answer (no tool calls) # has_tool_calls = ( # hasattr(latest_message, 'tool_calls') # and latest_message.tool_calls # and len(latest_message.tool_calls) > 0 # ) # if not has_tool_calls: # # This is the final answer # final_response = latest_message.content # # Don't break - let it complete # # Return the final response # if final_response: # return final_response # else: # return "I couldn't find an answer. Please try rephrasing your question." # except Exception as e: # return f"Error during search: {str(e)}" """ NEW: Uses LangGraph workflow instead of direct calls """ # Prepare initial state initial_state = { "user_message": user_message, "file_data": file_data or {}, "memory_context": memory_context, "decision": "", "response": "", "needs_refinement": False, "attempt_count": 0 } # Run the graph final_state = agent_graph.invoke(initial_state) # Return the final response return final_state.get("response", "No response generated")