Agent_Jake / app.py
jisaacso219's picture
Update app.py
2e03377 verified
raw
history blame
5.95 kB
from dotenv import load_dotenv
from openai import OpenAI
import json
import os
import requests
from PyPDF2 import PdfReader
import gradio as gr
import gdown
from datetime import datetime
from pathlib import Path
import zipfile
load_dotenv(override=True)
# ─── Ensure chat_logs Folder Exists ───
logs_path = Path("chat_logs")
logs_path.mkdir(exist_ok=True)
# ─── Pushover Notifications ───────────
def push(text):
try:
keep_path = logs_path / ".keep"
keep_path.touch(exist_ok=True)
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": os.getenv("PUSHOVER_TOKEN"),
"user": os.getenv("PUSHOVER_USER"),
"message": text,
}
)
except Exception as e:
print(f"Pushover error: {e}")
# ─── Tool Functions ───────────────────
def record_user_details(email, name="Name not provided", notes="not provided"):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = logs_path / f"session_{timestamp}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(me.session_log, f, indent=2)
msg = f"[New Contact]\nName: {name}\nEmail: {email}\nNotes: {notes}\n\nπŸ”— View log: {filename}"
push(msg)
return {"recorded": "ok"}
def record_unknown_question(question):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = logs_path / f"session_{timestamp}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(me.session_log, f, indent=2)
msg = f"[Unknown Question]\nQ: {question}\n\nπŸ”— View log: {filename}"
push(msg)
return {"recorded": "ok"}
tools = [
{"type": "function", "function": {
"name": "record_user_details",
"description": "Record user contact information.",
"parameters": {
"type": "object",
"properties": {
"email": {"type": "string"},
"name": {"type": "string"},
"notes": {"type": "string"}
},
"required": ["email"]
}
}},
{"type": "function", "function": {
"name": "record_unknown_question",
"description": "Record questions unable to be answered.",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string"}
},
"required": ["question"]
}
}}
]
# ─── Core Chatbot Class ───────────────
class Me:
def __init__(self):
self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.name = "Jacob Isaacson"
self.session_log = []
logs_path.mkdir(exist_ok=True)
gdown.download("https://drive.google.com/uc?id=1xz2RowkImpI8odYv8zvKdlRHaKfILn40", "linkedin.pdf", quiet=False)
reader = PdfReader("linkedin.pdf")
self.linkedin = "".join(page.extract_text() or "" for page in reader.pages)
gdown.download("https://drive.google.com/uc?id=1hjJz082YFSVjFtpO0pwT6Tyy3eLYYj6-", "summary.txt", quiet=False)
with open("summary.txt", "r", encoding="utf-8") as f:
self.summary = f.read()
self.archive_logs()
def system_prompt(self):
return f"""You are acting as {self.name}. You're answering questions about {self.name}'s career, experience, and skills.
Be professional and conversational.
If you can't answer something, call `record_unknown_question`. If a user seems interested, ask for their email and use `record_user_details`.
## Summary:
{self.summary}
## LinkedIn Profile:
{self.linkedin}
"""
def handle_tool_call(self, tool_calls):
results = []
for tool_call in tool_calls:
func = globals()[tool_call.function.name]
arguments = json.loads(tool_call.function.arguments)
result = func(**arguments)
results.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})
return results
def chat_stream(self, message, history):
messages = [{"role": "system", "content": self.system_prompt()}]
for msg in history:
messages.append(msg)
messages.append({"role": "user", "content": message})
self.session_log.append({"role": "user", "content": message})
response = self.openai.chat.completions.create(
model="gpt-4o", messages=messages, tools=tools, stream=False
)
reply = response.choices[0].message
full_response = reply.content or ""
self.session_log.append({"role": "assistant", "content": full_response})
self.save_session_log()
yield full_response + "\n\nπŸ’¬ Let me know if you’d like to follow up or need help connecting with Jacob."
def save_session_log(self):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = logs_path / f"session_{timestamp}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(self.session_log, f, indent=2)
def archive_logs(self):
zip_path = logs_path / "weekly_archive.zip"
with zipfile.ZipFile(zip_path, "w") as archive:
for log_file in logs_path.glob("session_*.json"):
archive.write(log_file, arcname=log_file.name)
me = Me()
with gr.Blocks(title="Jacob Isaacson Chatbot") as iface:
with gr.Row():
gr.Markdown("# Chat with Jacob Isaacson")
gr.ChatInterface(
fn=me.chat_stream,
chatbot=gr.Chatbot(show_copy_button=True),
type="messages",
chatbot_initial_message={
"role": "assistant",
"content": "Hello, my name is Jacob Isaacson. Ask any questions about my professional career!"
}
)
iface.launch()