Agent_Jake / app.py
jisaacso219's picture
Update app.py
062506d verified
raw
history blame
5.91 kB
from dotenv import load_dotenv
from openai import OpenAI
import json
import os
import requests
from PyPDF2 import PdfReader
import gradio as gr
import gdown
from datetime import datetime
from pathlib import Path
import zipfile
load_dotenv(override=True)
def push(text):
try:
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": os.getenv("PUSHOVER_TOKEN"),
"user": os.getenv("PUSHOVER_USER"),
"message": text,
}
)
except Exception as e:
print(f"Pushover error: {e}")
def record_user_details(email, name="Name not provided", notes="not provided"):
push(f"Recording {name} with email {email} and notes {notes}")
return {"recorded": "ok"}
def record_unknown_question(question):
push(f"Recording {question}")
return {"recorded": "ok"}
record_user_details_json = {
"name": "record_user_details",
"description": "Use this tool to record that a user is interested in being in touch and provided an email address",
"parameters": {
"type": "object",
"properties": {
"email": {"type": "string"},
"name": {"type": "string"},
"notes": {"type": "string"}
},
"required": ["email"],
"additionalProperties": False
}
}
record_unknown_question_json = {
"name": "record_unknown_question",
"description": "Record a question that couldn't be answered",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string"}
},
"required": ["question"],
"additionalProperties": False
}
}
tools = [
{"type": "function", "function": record_user_details_json},
{"type": "function", "function": record_unknown_question_json}
]
class Me:
def __init__(self):
self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.name = "Jacob Isaacson"
self.session_log = []
Path("chat_logs").mkdir(exist_ok=True)
# Download LinkedIn PDF
gdown.download("https://drive.google.com/uc?id=1xz2RowkImpI8odYv8zvKdlRHaKfILn40", "linkedin.pdf", quiet=False)
reader = PdfReader("linkedin.pdf")
self.linkedin = "".join(page.extract_text() or "" for page in reader.pages)
# Download summary.txt
gdown.download("https://drive.google.com/uc?id=1hjJz082YFSVjFtpO0pwT6Tyy3eLYYj6-", "summary.txt", quiet=False)
with open("summary.txt", "r", encoding="utf-8") as f:
self.summary = f.read()
self.archive_logs()
def system_prompt(self):
return f"""You are acting as {self.name}. You're answering questions on {self.name}'s website about his career, experience, and skills.
Be professional and conversational, as if talking to a potential employer or client.
If you can't answer something, call `record_unknown_question`. If a user seems interested, ask for their email and use `record_user_details`.
## Summary:
{self.summary}
## LinkedIn Profile:
{self.linkedin}
"""
def handle_tool_call(self, tool_calls):
results = []
for tool_call in tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
tool = globals().get(tool_name)
result = tool(**arguments) if tool else {}
results.append({"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result)})
return results
def chat_stream(self, message, history):
messages = [{"role": "system", "content": self.system_prompt()}]
for user, assistant in history:
messages.append({"role": "user", "content": user})
messages.append({"role": "assistant", "content": assistant})
messages.append({"role": "user", "content": message})
self.session_log.append({"role": "user", "content": message})
stream = self.openai.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
stream=True
)
full_response = ""
for chunk in stream:
delta = chunk.choices[0].delta
if hasattr(delta, "content") and delta.content:
full_response += delta.content
yield full_response
# Append a helpful follow-up message
full_response += "\n\n💬 Let me know if you’d like to follow up or need help connecting with Jacob."
self.session_log.append({"role": "assistant", "content": full_response})
self.save_session_log()
def save_session_log(self):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"chat_logs/session_{timestamp}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(self.session_log, f, indent=2)
def archive_logs(self):
"""Create a zip archive of all previous logs — replaceable for weekly automation"""
zip_path = "chat_logs/weekly_archive.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as archive:
for log_file in Path("chat_logs").glob("session_*.json"):
archive.write(log_file, arcname=log_file.name)
# Instantiate assistant
me = Me()
# Build UI
with gr.Blocks(title="Jacob Isaacson Chatbot") as iface:
with gr.Row():
gr.Image("jacob.png", width=100, show_label=False)
gr.Markdown("### Chat with Jacob Isaacson\nAsk about Jacob's background, skills, or career. \n🛡️ *All chats are logged for improvement purposes.*")
gr.ChatInterface(
fn=me.chat_stream,
chatbot=gr.Chatbot(show_copy_button=True),
examples=["What is Jacob's experience with AI?", "Tell me about his recent projects."],
type="messages"
)
if __name__ == "__main__":
iface.launch()