Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
import os | |
from dotenv import load_dotenv | |
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser | |
from langchain.schema import AgentAction, AgentFinish, HumanMessage | |
from langchain.prompts import BaseChatPromptTemplate | |
from langchain.tools import Tool | |
from langchain.memory import ConversationBufferWindowMemory | |
from transformers import pipeline | |
from typing import List, Union | |
import re | |
# Load environment variables from .env | |
load_dotenv() | |
# Job API keys and endpoints | |
JOB_API_KEY = os.getenv("JOB_API_KEY") # Add your job API key here if required | |
JOBS_API_URL = "https://jobs.github.com/positions.json" # Example API endpoint (replace with an actual one) | |
# Function to find global job openings | |
def find_global_jobs(): | |
try: | |
response = requests.get(JOBS_API_URL) | |
if response.status_code == 200: | |
jobs = response.json() | |
return [ | |
{ | |
"title": job["title"], | |
"company": job["company"], | |
"location": job["location"], | |
"url": job["url"] | |
} for job in jobs | |
] | |
else: | |
return {"error": "Unable to fetch job data."} | |
except Exception as e: | |
return {"error": str(e)} | |
# Function to find remote jobs | |
def find_remote_jobs(): | |
try: | |
response = requests.get(f"{JOBS_API_URL}?location=remote") | |
if response.status_code == 200: | |
jobs = response.json() | |
return [ | |
{ | |
"title": job["title"], | |
"company": job["company"], | |
"url": job["url"] | |
} for job in jobs | |
] | |
else: | |
return {"error": "Unable to fetch remote job data."} | |
except Exception as e: | |
return {"error": str(e)} | |
# Function to find jobs near a location | |
def find_jobs_near_location(location): | |
try: | |
response = requests.get(f"{JOBS_API_URL}?location={location}") | |
if response.status_code == 200: | |
jobs = response.json() | |
return [ | |
{ | |
"title": job["title"], | |
"company": job["company"], | |
"location": job["location"], | |
"url": job["url"] | |
} for job in jobs | |
] | |
else: | |
return {"error": "Unable to fetch job data for location."} | |
except Exception as e: | |
return {"error": str(e)} | |
# Define LangChain tools | |
global_jobs_tool = Tool( | |
name="Global Job Finder", | |
func=find_global_jobs, | |
description="Find all job openings around the world." | |
) | |
remote_jobs_tool = Tool( | |
name="Remote Job Finder", | |
func=find_remote_jobs, | |
description="Find remote job openings." | |
) | |
local_jobs_tool = Tool( | |
name="Local Job Finder", | |
func=find_jobs_near_location, | |
description="Find job openings near a specified location. Input should be a city or region name." | |
) | |
# Set up the tools | |
tools = [ | |
global_jobs_tool, | |
remote_jobs_tool, | |
local_jobs_tool | |
] | |
# Set up a prompt template with history | |
template_with_history = """You are JobSearchGPT, an AI assistant specialized in finding job openings. Answer the following questions as best you can. You have access to the following tools: | |
{tools} | |
Use the following format: | |
Question: the input question you must answer | |
Thought: you should always think about what to do | |
Action: the action to take, should be one of [{tool_names}] | |
Action Input: the input to the action | |
Observation: the result of the action | |
(this Thought/Action/Action Input/Observation can repeat N times) | |
Thought: I now know the final answer | |
Final Answer: the final answer to the original input question | |
Begin! Remember to give detailed, informative answers | |
Previous conversation history: | |
{history} | |
New question: {input} | |
{agent_scratchpad}""" | |
# Set up the prompt template | |
class CustomPromptTemplate(BaseChatPromptTemplate): | |
template: str | |
tools: List[Tool] | |
def format_messages(self, **kwargs) -> str: | |
intermediate_steps = kwargs.pop("intermediate_steps") | |
thoughts = "" | |
for action, observation in intermediate_steps: | |
thoughts += action.log | |
thoughts += f"\nObservation: {observation}\nThought: " | |
kwargs["agent_scratchpad"] = thoughts | |
kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) | |
kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) | |
formatted = self.template.format(**kwargs) | |
return [HumanMessage(content=formatted)] | |
prompt_with_history = CustomPromptTemplate( | |
template=template_with_history, | |
tools=tools, | |
input_variables=["input", "intermediate_steps", "history"] | |
) | |
# Custom output parser | |
class CustomOutputParser(AgentOutputParser): | |
def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: | |
if "Final Answer:" in llm_output: | |
return AgentFinish( | |
return_values={"output": llm_output.split("Final Answer:")[-1].strip()}, | |
log=llm_output, | |
) | |
regex = r"Action: (.*?)[\n]*Action Input:[\s]*(.*)" | |
match = re.search(regex, llm_output, re.DOTALL) | |
if not match: | |
raise ValueError(f"Could not parse LLM output: `{llm_output}`") | |
action = match.group(1).strip() | |
action_input = match.group(2) | |
return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output) | |
output_parser = CustomOutputParser() | |
# Initialize HuggingFace pipeline | |
pipe = pipeline("text-generation", model="gpt-neo-2.7B") # Replace with a suitable model | |
# LLM chain | |
llm_chain = LLMChain(llm=pipe, prompt=prompt_with_history) | |
tool_names = [tool.name for tool in tools] | |
agent = LLMSingleActionAgent( | |
llm_chain=llm_chain, | |
output_parser=output_parser, | |
stop=["\nObservation:"], | |
allowed_tools=tool_names | |
) | |
memory = ConversationBufferWindowMemory(k=2) | |
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory) | |
# Streamlit app | |
st.title("Job Search Helper Agent") | |
query = st.text_input("Enter your query:") | |
if st.button("Submit"): | |
if query: | |
st.write("Debug: User Query ->", query) | |
with st.spinner("Processing..."): | |
try: | |
# Run the agent and get the response | |
response = agent_executor.run(query) # Correct method is `run()` | |
st.success("Response:") | |
st.write(response) | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |