Spaces:
Runtime error
Runtime error
import os | |
from dotenv import load_dotenv | |
from langchain.tools import Tool | |
from langchain.utilities import SerpAPIWrapper | |
from langgraph.graph.message import add_messages | |
from langgraph.graph import START, StateGraph | |
from langgraph.prebuilt import ToolNode, tools_condition | |
from langchain_core.messages import AnyMessage, HumanMessage | |
from langchain_groq import ChatGroq | |
from typing import TypedDict, Annotated | |
# Load API keys | |
load_dotenv() | |
groq_api_key = os.getenv("GROQ_API_KEY") | |
serpapi_api_key = os.getenv("SERPAPI_API_KEY") | |
# --- أدوات --- | |
def calculator_tool_func(query: str): | |
try: | |
result = str(eval(query, {"__builtins__": {}})) | |
return result | |
except Exception: | |
return "تعذر حساب التعبير المدخل." | |
calculator_tool = Tool( | |
name="Calculator", | |
func=calculator_tool_func, | |
description="Performs simple arithmetic calculations." | |
) | |
def weather_tool_func(location: str): | |
return f"حالة الطقس في {location}: مشمس، درجة الحرارة 25 درجة مئوية." | |
weather_tool = Tool( | |
name="Weather", | |
func=weather_tool_func, | |
description="Provides weather information for a given location." | |
) | |
serpapi = SerpAPIWrapper(serpapi_api_key=serpapi_api_key) | |
SerpAPI_tool = Tool( | |
name="WebSearch", | |
func=serpapi.run, | |
description="Searches the web for recent information." | |
) | |
# --- تعريف الموديل والوكلاء --- | |
tools = [SerpAPI_tool, calculator_tool, weather_tool] | |
llm = ChatGroq(model="deepseek-r1-distill-llama-70b", groq_api_key=groq_api_key) | |
llm_with_tools = llm.bind_tools(tools) | |
class AgentState(TypedDict): | |
messages: Annotated[list[AnyMessage], add_messages] | |
def assistant(state: AgentState): | |
return {"messages": [llm_with_tools.invoke(state["messages"])]} | |
builder = StateGraph(AgentState) | |
builder.add_node("assistant", assistant) | |
builder.add_node("tools", ToolNode(tools)) | |
builder.add_edge(START, "assistant") | |
builder.add_conditional_edges("assistant", tools_condition) | |
builder.add_edge("tools", "assistant") | |
# 👇 compiled agent object (must be used with .invoke({...})) | |
ninu = builder.compile() | |
# --- دالة تشغيل مستقلة (اختياري) --- | |
def run_ninu(query: str): | |
conversation = [] | |
intro_prompt = """ | |
You are a helpful assistant. When you have the final answer, respond exactly with: FINAL ANSWER: [your answer here]. | |
If you cannot answer, respond: FINAL ANSWER: I do not know. | |
""" | |
conversation.append(HumanMessage(content=intro_prompt)) | |
conversation.append(HumanMessage(content=query)) | |
response = ninu.invoke({"messages": conversation}) | |
for message in reversed(response["messages"]): | |
if isinstance(message.content, str) and "FINAL ANSWER:" in message.content: | |
return message.content.split("FINAL ANSWER:")[-1].strip() | |
return "لم أتمكن من إيجاد إجابة نهائية." | |