|
import streamlit as st |
|
import streamlit.components.v1 as components |
|
import os |
|
import base64 |
|
import glob |
|
import io |
|
import json |
|
import mistune |
|
import pytz |
|
import math |
|
import requests |
|
import sys |
|
import time |
|
import re |
|
import textract |
|
import zipfile |
|
import random |
|
import httpx |
|
import asyncio |
|
from openai import OpenAI |
|
|
|
from datetime import datetime |
|
from xml.etree import ElementTree as ET |
|
from bs4 import BeautifulSoup |
|
from collections import deque |
|
from audio_recorder_streamlit import audio_recorder |
|
from dotenv import load_dotenv |
|
from PyPDF2 import PdfReader |
|
from langchain.text_splitter import CharacterTextSplitter |
|
from langchain.embeddings import OpenAIEmbeddings |
|
from langchain.vectorstores import FAISS |
|
from langchain.chat_models import ChatOpenAI |
|
from langchain.memory import ConversationBufferMemory |
|
from langchain.chains import ConversationalRetrievalChain |
|
from templates import css, bot_template, user_template |
|
from io import BytesIO |
|
from contextlib import redirect_stdout |
|
|
|
import seaborn |
|
import plotly |
|
import vega_datasets |
|
import bokeh |
|
import holoviews |
|
import plotnine |
|
import graphviz |
|
import tensorflow |
|
import torch |
|
|
|
|
|
st.set_page_config(page_title="Python AI Pair Programmer", layout="wide") |
|
|
|
|
|
should_save = st.sidebar.checkbox("πΎ Save", value=True) |
|
col1, col2, col3, col4 = st.columns(4) |
|
with col1: |
|
with st.expander("Settings π§ πΎ", expanded=True): |
|
|
|
menu = ["txt", "htm", "xlsx", "csv", "md", "py"] |
|
choice = st.sidebar.selectbox("Output File Type:", menu) |
|
model_choice = st.sidebar.radio("Select Model:", ('gpt-3.5-turbo', 'gpt-3.5-turbo-0301')) |
|
|
|
|
|
context = {} |
|
|
|
def create_file(filename, prompt, response, should_save=True): |
|
if not should_save: |
|
return |
|
|
|
|
|
base_filename, ext = os.path.splitext(filename) |
|
|
|
|
|
combined_content = "" |
|
|
|
|
|
combined_content += "# Prompt π\n" + prompt + "\n\n" |
|
|
|
|
|
combined_content += "# Response π¬\n" + response + "\n\n" |
|
|
|
|
|
resources = re.findall(r"```([\s\S]*?)```", response) |
|
for resource in resources: |
|
|
|
if "python" in resource.lower(): |
|
|
|
cleaned_code = re.sub(r'^\s*python', '', resource, flags=re.IGNORECASE | re.MULTILINE) |
|
|
|
|
|
combined_content += "# Code Results π\n" |
|
|
|
|
|
original_stdout = sys.stdout |
|
sys.stdout = io.StringIO() |
|
|
|
|
|
try: |
|
exec(cleaned_code, context) |
|
code_output = sys.stdout.getvalue() |
|
combined_content += f"```\n{code_output}\n```\n\n" |
|
realtimeEvalResponse = "# Code Results π\n" + "```" + code_output + "```\n\n" |
|
st.code(realtimeEvalResponse) |
|
|
|
except Exception as e: |
|
combined_content += f"```python\nError executing Python code: {e}\n```\n\n" |
|
|
|
|
|
sys.stdout = original_stdout |
|
else: |
|
|
|
combined_content += "# Resource π οΈ\n" + "```" + resource + "```\n\n" |
|
|
|
|
|
if should_save: |
|
with open(f"{base_filename}.md", 'w') as file: |
|
file.write(combined_content) |
|
st.code(combined_content) |
|
|
|
|
|
with open(f"{base_filename}.md", 'rb') as file: |
|
encoded_file = base64.b64encode(file.read()).decode() |
|
href = f'<a href="data:file/markdown;base64,{encoded_file}" download="{filename}">Download File π</a>' |
|
st.markdown(href, unsafe_allow_html=True) |
|
|
|
|
|
|
|
def readitaloud(result): |
|
documentHTML5=''' |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>Read It Aloud</title> |
|
<script type="text/javascript"> |
|
function readAloud() { |
|
const text = document.getElementById("textArea").value; |
|
const speech = new SpeechSynthesisUtterance(text); |
|
window.speechSynthesis.speak(speech); |
|
} |
|
</script> |
|
</head> |
|
<body> |
|
<h1>π Read It Aloud</h1> |
|
<textarea id="textArea" rows="10" cols="80"> |
|
''' |
|
documentHTML5 = documentHTML5 + result |
|
documentHTML5 = documentHTML5 + ''' |
|
</textarea> |
|
<br> |
|
<button onclick="readAloud()">π Read Aloud</button> |
|
</body> |
|
</html> |
|
''' |
|
|
|
components.html(documentHTML5, width=800, height=300) |
|
|
|
|
|
def generate_filename(prompt, file_type): |
|
central = pytz.timezone('US/Central') |
|
safe_date_time = datetime.now(central).strftime("%m%d_%H%M") |
|
replaced_prompt = prompt.replace(" ", "_").replace("\n", "_") |
|
safe_prompt = "".join(x for x in replaced_prompt if x.isalnum() or x == "_")[:90] |
|
return f"{safe_date_time}_{safe_prompt}.{file_type}" |
|
|
|
|
|
def chat_with_model(prompt, document_section, model_choice='gpt-3.5-turbo'): |
|
model = model_choice |
|
conversation = [{'role': 'system', 'content': 'You are a python script writer.'}] |
|
conversation.append({'role': 'user', 'content': prompt}) |
|
if len(document_section)>0: |
|
conversation.append({'role': 'assistant', 'content': document_section}) |
|
start_time = time.time() |
|
report = [] |
|
res_box = st.empty() |
|
collected_chunks = [] |
|
collected_messages = [] |
|
key = os.getenv('OPENAI_API_KEY') |
|
|
|
client = OpenAI( |
|
api_key= os.getenv('OPENAI_API_KEY') |
|
) |
|
stream = client.chat.completions.create( |
|
model='gpt-3.5-turbo', |
|
messages=conversation, |
|
stream=True, |
|
) |
|
all_content = "" |
|
for part in stream: |
|
chunk_message = (part.choices[0].delta.content or "") |
|
collected_messages.append(chunk_message) |
|
content=part.choices[0].delta.content |
|
try: |
|
if len(content) > 0: |
|
report.append(content) |
|
all_content += content |
|
result = "".join(report).strip() |
|
res_box.markdown(f'*{result}*') |
|
except: |
|
st.write(' ') |
|
full_reply_content = all_content |
|
st.write("Elapsed time:") |
|
st.write(time.time() - start_time) |
|
filename = generate_filename(full_reply_content, choice) |
|
create_file(filename, prompt, full_reply_content, should_save) |
|
readitaloud(full_reply_content) |
|
return full_reply_content |
|
|
|
def chat_with_file_contents(prompt, file_content, model_choice='gpt-3.5-turbo'): |
|
conversation = [{'role': 'system', 'content': 'You are a helpful assistant.'}] |
|
conversation.append({'role': 'user', 'content': prompt}) |
|
if len(file_content)>0: |
|
conversation.append({'role': 'assistant', 'content': file_content}) |
|
client = OpenAI( |
|
api_key= os.getenv('OPENAI_API_KEY') |
|
) |
|
response = client.chat.completions.create(model=model_choice, messages=conversation) |
|
return response['choices'][0]['message']['content'] |
|
|
|
def link_button_with_emoji(url, title, emoji_summary): |
|
emojis = ["π", "π₯", "π‘οΈ", "π©Ί", "π¬", "π", "π§ͺ", "π¨ββοΈ", "π©ββοΈ"] |
|
random_emoji = random.choice(emojis) |
|
st.markdown(f"[{random_emoji} {emoji_summary} - {title}]({url})") |
|
|
|
python_parts = { |
|
"Syntax": {"emoji": "βοΈ", "details": "Variables, Comments, Printing"}, |
|
"Data Types": {"emoji": "π", "details": "Numbers, Strings, Lists, Tuples, Sets, Dictionaries"}, |
|
"Control Structures": {"emoji": "π", "details": "If, Elif, Else, Loops, Break, Continue"}, |
|
"Functions": {"emoji": "π§", "details": "Defining, Calling, Parameters, Return Values"}, |
|
"Classes": {"emoji": "ποΈ", "details": "Creating, Inheritance, Methods, Properties"}, |
|
"API Interaction": {"emoji": "π", "details": "Requests, JSON Parsing, HTTP Methods"}, |
|
"Data Visualization Libraries1": {"emoji": "π", "details": "matplotlib"}, |
|
"Data Visualization Libraries2": {"emoji": "π", "details": "seaborn"}, |
|
"Data Visualization Libraries3": {"emoji": "π", "details": "plotly"}, |
|
"Data Visualization Libraries4": {"emoji": "π", "details": "altair"}, |
|
"Data Visualization Libraries5": {"emoji": "π", "details": "bokeh"}, |
|
"Data Visualization Libraries6": {"emoji": "π", "details": "pydeck"}, |
|
"Data Visualization Libraries7": {"emoji": "π", "details": "holoviews"}, |
|
"Data Visualization Libraries8": {"emoji": "π", "details": "plotnine"}, |
|
"Data Visualization Libraries9": {"emoji": "π", "details": "graphviz"}, |
|
"Error Handling": {"emoji": "β οΈ", "details": "Try, Except, Finally, Raising"}, |
|
"Scientific & Data Analysis Libraries": {"emoji": "π§ͺ", "details": "Numpy, Pandas, Scikit-Learn, TensorFlow, SciPy, Pillow"}, |
|
"Advanced Concepts": {"emoji": "π§ ", "details": "Decorators, Generators, Context Managers, Metaclasses, Asynchronous Programming"}, |
|
"Web & Network Libraries": {"emoji": "πΈοΈ", "details": "Flask, Django, Requests, BeautifulSoup, HTTPX, Asyncio"}, |
|
"Streamlit & Extensions1": {"emoji": "π‘", "details": "Streamlit"}, |
|
"Streamlit & Extensions2": {"emoji": "π‘", "details": "Streamlit-AgGrid"}, |
|
"Streamlit & Extensions3": {"emoji": "π‘", "details": "Streamlit-Folium"}, |
|
"Streamlit & Extensions4": {"emoji": "π‘", "details": "Streamlit-Pandas-Profiling"}, |
|
"Streamlit & Extensions5": {"emoji": "π‘", "details": "Streamlit-Vega-Lite, Gradio"}, |
|
"Gradio": {"emoji": "π‘", "details": "gradio"}, |
|
"File Handling & Serialization": {"emoji": "π", "details": "PyPDF2, Pytz, Json, Base64, Zipfile, Random, Glob, IO"}, |
|
"Machine Learning & AI": {"emoji": "π€", "details": "OpenAI, LangChain, HuggingFace"}, |
|
"Text & Data Extraction": {"emoji": "π", "details": "TikToken, Textract, SQLAlchemy, Pillow"}, |
|
"XML & Collections Libraries": {"emoji": "π", "details": "XML, Collections"}, |
|
"Top PyPI Libraries1": {"emoji": "π", "details": "Requests, Pillow, SQLAlchemy, Flask, Django, SciPy, Beautiful Soup, PyTest, PyGame, Twisted"}, |
|
"Top PyPI Libraries2": {"emoji": "π", "details": "numpy, pandas, matplotlib, requests, beautifulsoup4"}, |
|
"Top PyPI Libraries3": {"emoji": "π", "details": "langchain, openai, PyPDF2, pytz"}, |
|
"Top PyPI Libraries4": {"emoji": "π", "details": "streamlit, audio_recorder_streamlit, gradio"}, |
|
"Top PyPI Libraries5": {"emoji": "π", "details": "tiktoken, textract, glob, io"}, |
|
"Top PyPI Libraries6": {"emoji": "π", "details": "matplotlib, seaborn, plotly, altair, bokeh, pydeck"}, |
|
"Top PyPI Libraries7": {"emoji": "π", "details": "streamlit, streamlit-aggrid, streamlit-folium, streamlit-pandas-profiling, streamlit-vega-lite"}, |
|
"Top PyPI Libraries8": {"emoji": "π", "details": "holoviews, plotnine, graphviz"}, |
|
"Top PyPI Libraries9": {"emoji": "π", "details": "json, base64, zipfile, random"}, |
|
"Top PyPI Libraries10": {"emoji": "π", "details": "httpx, asyncio, xml, collections, huggingface "} |
|
} |
|
|
|
|
|
response_placeholders = {} |
|
example_placeholders = {} |
|
|
|
def display_python_parts_old2(): |
|
st.title("Python Interactive Learning Platform") |
|
|
|
for part, content in python_parts.items(): |
|
with st.expander(f"{content['emoji']} {part} - {content['details']}", expanded=False): |
|
if st.button(f"Show Example for {part}", key=f"example_{part}"): |
|
example = "Write short python script examples with mock data in python list dictionary for inputs for " + part |
|
example_placeholders[part] = example |
|
st.code(example_placeholders[part], language="python") |
|
response = chat_with_model(f'Write python script with short code examples for: {content["details"]}', part) |
|
response_placeholders[part] = response |
|
st.write(f"#### {content['emoji']} {part} Example") |
|
st.code(response_placeholders[part], language="python") |
|
|
|
if st.button(f"Take Quiz on {part}", key=f"quiz_{part}"): |
|
quiz = "Write Python script quiz examples with mock static data inputs for " + part |
|
response = chat_with_model(f'Write python code blocks for quiz program: {quiz}', part) |
|
response_placeholders[part] = response |
|
st.write(f"#### {content['emoji']} {part} Quiz") |
|
st.code(response_placeholders[part], language="python") |
|
|
|
prompt = f"Write python script with a few advanced coding examples using mock data input for {content['details']}" |
|
if st.button(f"Explore {part}", key=part): |
|
response = chat_with_model(prompt, part) |
|
response_placeholders[part] = response |
|
st.write(f"#### {content['emoji']} {part} Details") |
|
st.code(response_placeholders[part], language="python") |
|
|
|
|
|
def display_python_parts(): |
|
st.title("Python Interactive Learning Platform") |
|
for part, content in python_parts.items(): |
|
with st.expander(f"{content['emoji']} {part} - {content['details']}", expanded=False): |
|
if st.button(f"Show Example for {part}", key=f"example_{part}"): |
|
example = "Python script example with mock example inputs for " + part |
|
example_placeholders[part] = example |
|
st.code(example_placeholders[part], language="python") |
|
response = chat_with_model('Create detailed advanced python script code examples for:' + example_placeholders[part], part) |
|
if st.button(f"Take Quiz on {part}", key=f"quiz_{part}"): |
|
quiz = "Python script quiz example with mock example inputs for " + part |
|
response = chat_with_model(quiz, part) |
|
prompt = f"Learn about advanced coding examples using mock example inputs for {content['details']}" |
|
if st.button(f"Explore {part}", key=part): |
|
response = chat_with_model(prompt, part) |
|
response_placeholders[part] = response |
|
if part in response_placeholders: |
|
st.markdown(f"**Response:** {response_placeholders[part]}") |
|
|
|
def add_paper_buttons_and_links(): |
|
page = st.sidebar.radio("Choose a page:", ["Python Pair Programmer"]) |
|
if page == "Python Pair Programmer": |
|
display_python_parts() |
|
|
|
col1, col2, col3, col4 = st.columns(4) |
|
|
|
with col1: |
|
with st.expander("MemGPT π§ πΎ", expanded=False): |
|
link_button_with_emoji("https://arxiv.org/abs/2310.08560", "MemGPT", "π§ πΎ Memory OS") |
|
outline_memgpt = "Memory Hierarchy, Context Paging, Self-directed Memory Updates, Memory Editing, Memory Retrieval, Preprompt Instructions, Semantic Memory, Episodic Memory, Emotional Contextual Understanding" |
|
if st.button("Discuss MemGPT Features"): |
|
chat_with_model("Discuss the key features of MemGPT: " + outline_memgpt, "MemGPT") |
|
|
|
with col2: |
|
with st.expander("AutoGen π€π", expanded=False): |
|
link_button_with_emoji("https://arxiv.org/abs/2308.08155", "AutoGen", "π€π Multi-Agent LLM") |
|
outline_autogen = "Cooperative Conversations, Combining Capabilities, Complex Task Solving, Divergent Thinking, Factuality, Highly Capable Agents, Generic Abstraction, Effective Implementation" |
|
if st.button("Explore AutoGen Multi-Agent LLM"): |
|
chat_with_model("Explore the key features of AutoGen: " + outline_autogen, "AutoGen") |
|
|
|
with col3: |
|
with st.expander("Whisper ππ§βπ", expanded=False): |
|
link_button_with_emoji("https://arxiv.org/abs/2212.04356", "Whisper", "ππ§βπ Robust STT") |
|
outline_whisper = "Scaling, Deep Learning Approaches, Weak Supervision, Zero-shot Transfer Learning, Accuracy & Robustness, Pre-training Techniques, Broad Range of Environments, Combining Multiple Datasets" |
|
if st.button("Learn About Whisper STT"): |
|
chat_with_model("Learn about the key features of Whisper: " + outline_whisper, "Whisper") |
|
|
|
with col4: |
|
with st.expander("ChatDev π¬π»", expanded=False): |
|
link_button_with_emoji("https://arxiv.org/pdf/2307.07924.pdf", "ChatDev", "π¬π» Comm. Agents") |
|
outline_chatdev = "Effective Communication, Comprehensive Software Solutions, Diverse Social Identities, Tailored Codes, Environment Dependencies, User Manuals" |
|
if st.button("Deep Dive into ChatDev"): |
|
chat_with_model("Deep dive into the features of ChatDev: " + outline_chatdev, "ChatDev") |
|
|
|
add_paper_buttons_and_links() |
|
|
|
|
|
|
|
def process_user_input(user_question): |
|
|
|
if 'conversation' not in st.session_state: |
|
st.session_state.conversation = {} |
|
|
|
response = st.session_state.conversation({'question': user_question}) |
|
st.session_state.chat_history = response['chat_history'] |
|
|
|
for i, message in enumerate(st.session_state.chat_history): |
|
template = user_template if i % 2 == 0 else bot_template |
|
st.write(template.replace("{{MSG}}", message.content), unsafe_allow_html=True) |
|
|
|
|
|
filename = generate_filename(user_question, 'txt') |
|
create_file(filename, user_question, message.content, should_save) |
|
|
|
|
|
create_expanders_and_buttons(message.content) |
|
|
|
def create_expanders_and_buttons(content): |
|
|
|
paragraphs = content.split("\n\n") |
|
for paragraph in paragraphs: |
|
|
|
header, detail = extract_feature_and_detail(paragraph) |
|
if header and detail: |
|
with st.expander(header, expanded=False): |
|
if st.button(f"Explore {header}"): |
|
expanded_outline = "Expand on the feature: " + detail |
|
chat_with_model(expanded_outline, header) |
|
|
|
def extract_feature_and_detail(paragraph): |
|
|
|
match = re.match(r"(.*?):(.*)", paragraph) |
|
if match: |
|
header = match.group(1).strip() |
|
detail = match.group(2).strip() |
|
return header, detail |
|
return None, None |
|
|
|
def transcribe_audio(file_path, model): |
|
key = os.getenv('OPENAI_API_KEY') |
|
headers = { |
|
"Authorization": f"Bearer {key}", |
|
} |
|
with open(file_path, 'rb') as f: |
|
data = {'file': f} |
|
st.write("Read file {file_path}", file_path) |
|
OPENAI_API_URL = "https://api.openai.com/v1/audio/transcriptions" |
|
response = requests.post(OPENAI_API_URL, headers=headers, files=data, data={'model': model}) |
|
if response.status_code == 200: |
|
st.write(response.json()) |
|
chatResponse = chat_with_model(response.json().get('text'), '') |
|
transcript = response.json().get('text') |
|
|
|
|
|
filename = generate_filename(transcript, 'txt') |
|
|
|
response = chatResponse |
|
user_prompt = transcript |
|
create_file(filename, user_prompt, response, should_save) |
|
return transcript |
|
else: |
|
st.write(response.json()) |
|
st.error("Error in API call.") |
|
return None |
|
|
|
def save_and_play_audio(audio_recorder): |
|
audio_bytes = audio_recorder() |
|
if audio_bytes: |
|
filename = generate_filename("Recording", "wav") |
|
with open(filename, 'wb') as f: |
|
f.write(audio_bytes) |
|
st.audio(audio_bytes, format="audio/wav") |
|
return filename |
|
return None |
|
|
|
|
|
|
|
def truncate_document(document, length): |
|
return document[:length] |
|
|
|
def divide_document(document, max_length): |
|
return [document[i:i+max_length] for i in range(0, len(document), max_length)] |
|
|
|
def get_table_download_link(file_path): |
|
with open(file_path, 'r') as file: |
|
try: |
|
data = file.read() |
|
except: |
|
st.write('') |
|
return file_path |
|
b64 = base64.b64encode(data.encode()).decode() |
|
file_name = os.path.basename(file_path) |
|
ext = os.path.splitext(file_name)[1] |
|
if ext == '.txt': |
|
mime_type = 'text/plain' |
|
elif ext == '.py': |
|
mime_type = 'text/plain' |
|
elif ext == '.xlsx': |
|
mime_type = 'text/plain' |
|
elif ext == '.csv': |
|
mime_type = 'text/plain' |
|
elif ext == '.htm': |
|
mime_type = 'text/html' |
|
elif ext == '.md': |
|
mime_type = 'text/markdown' |
|
else: |
|
mime_type = 'application/octet-stream' |
|
href = f'<a href="data:{mime_type};base64,{b64}" target="_blank" download="{file_name}">{file_name}</a>' |
|
return href |
|
|
|
def CompressXML(xml_text): |
|
root = ET.fromstring(xml_text) |
|
for elem in list(root.iter()): |
|
if isinstance(elem.tag, str) and 'Comment' in elem.tag: |
|
elem.parent.remove(elem) |
|
return ET.tostring(root, encoding='unicode', method="xml") |
|
|
|
def read_file_content(file,max_length): |
|
if file.type == "application/json": |
|
content = json.load(file) |
|
return str(content) |
|
elif file.type == "text/html" or file.type == "text/htm": |
|
content = BeautifulSoup(file, "html.parser") |
|
return content.text |
|
elif file.type == "application/xml" or file.type == "text/xml": |
|
tree = ET.parse(file) |
|
root = tree.getroot() |
|
xml = CompressXML(ET.tostring(root, encoding='unicode')) |
|
return xml |
|
elif file.type == "text/markdown" or file.type == "text/md": |
|
md = mistune.create_markdown() |
|
content = md(file.read().decode()) |
|
return content |
|
elif file.type == "text/plain": |
|
return file.getvalue().decode() |
|
else: |
|
return "" |
|
|
|
def extract_mime_type(file): |
|
|
|
if isinstance(file, str): |
|
pattern = r"type='(.*?)'" |
|
match = re.search(pattern, file) |
|
if match: |
|
return match.group(1) |
|
else: |
|
raise ValueError(f"Unable to extract MIME type from {file}") |
|
|
|
elif isinstance(file, streamlit.UploadedFile): |
|
return file.type |
|
else: |
|
raise TypeError("Input should be a string or a streamlit.UploadedFile object") |
|
|
|
|
|
|
|
def extract_file_extension(file): |
|
|
|
file_name = file.name |
|
pattern = r".*?\.(.*?)$" |
|
match = re.search(pattern, file_name) |
|
if match: |
|
return match.group(1) |
|
else: |
|
raise ValueError(f"Unable to extract file extension from {file_name}") |
|
|
|
def pdf2txt(docs): |
|
text = "" |
|
for file in docs: |
|
file_extension = extract_file_extension(file) |
|
|
|
st.write(f"File type extension: {file_extension}") |
|
|
|
|
|
try: |
|
if file_extension.lower() in ['py', 'txt', 'html', 'htm', 'xml', 'json']: |
|
text += file.getvalue().decode('utf-8') |
|
elif file_extension.lower() == 'pdf': |
|
from PyPDF2 import PdfReader |
|
pdf = PdfReader(BytesIO(file.getvalue())) |
|
for page in range(len(pdf.pages)): |
|
text += pdf.pages[page].extract_text() |
|
except Exception as e: |
|
st.write(f"Error processing file {file.name}: {e}") |
|
return text |
|
|
|
def txt2chunks(text): |
|
text_splitter = CharacterTextSplitter(separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len) |
|
return text_splitter.split_text(text) |
|
|
|
def vector_store(text_chunks): |
|
key = os.getenv('OPENAI_API_KEY') |
|
embeddings = OpenAIEmbeddings(openai_api_key=key) |
|
return FAISS.from_texts(texts=text_chunks, embedding=embeddings) |
|
|
|
def get_chain(vectorstore): |
|
llm = ChatOpenAI() |
|
memory = ConversationBufferMemory(memory_key='chat_history', return_messages=True) |
|
return ConversationalRetrievalChain.from_llm(llm=llm, retriever=vectorstore.as_retriever(), memory=memory) |
|
|
|
def divide_prompt(prompt, max_length): |
|
words = prompt.split() |
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
for word in words: |
|
if len(word) + current_length <= max_length: |
|
current_length += len(word) + 1 |
|
current_chunk.append(word) |
|
else: |
|
chunks.append(' '.join(current_chunk)) |
|
current_chunk = [word] |
|
current_length = len(word) |
|
chunks.append(' '.join(current_chunk)) |
|
return chunks |
|
|
|
def create_zip_of_files(files): |
|
""" |
|
Create a zip file from a list of files. |
|
""" |
|
zip_name = "all_files.zip" |
|
with zipfile.ZipFile(zip_name, 'w') as zipf: |
|
for file in files: |
|
zipf.write(file) |
|
return zip_name |
|
|
|
|
|
def get_zip_download_link(zip_file): |
|
""" |
|
Generate a link to download the zip file. |
|
""" |
|
with open(zip_file, 'rb') as f: |
|
data = f.read() |
|
b64 = base64.b64encode(data).decode() |
|
href = f'<a href="data:application/zip;base64,{b64}" download="{zip_file}">Download All</a>' |
|
return href |
|
|
|
|
|
def main(): |
|
|
|
|
|
filename = save_and_play_audio(audio_recorder) |
|
|
|
if filename is not None: |
|
try: |
|
transcription = transcribe_audio(filename, "whisper-1") |
|
except: |
|
st.write(' ') |
|
st.sidebar.markdown(get_table_download_link(filename), unsafe_allow_html=True) |
|
filename = None |
|
|
|
|
|
user_prompt = st.text_area("Enter prompts, instructions & questions:", '', height=100) |
|
|
|
|
|
collength, colupload = st.columns([2,3]) |
|
with collength: |
|
max_length = st.slider("File section length for large files", min_value=1000, max_value=128000, value=12000, step=1000) |
|
with colupload: |
|
uploaded_file = st.file_uploader("Add a file for context:", type=["pdf", "xml", "json", "xlsx", "csv", "html", "htm", "md", "txt"]) |
|
|
|
|
|
|
|
|
|
document_sections = deque() |
|
document_responses = {} |
|
if uploaded_file is not None: |
|
file_content = read_file_content(uploaded_file, max_length) |
|
document_sections.extend(divide_document(file_content, max_length)) |
|
if len(document_sections) > 0: |
|
if st.button("ποΈ View Upload"): |
|
st.markdown("**Sections of the uploaded file:**") |
|
for i, section in enumerate(list(document_sections)): |
|
st.markdown(f"**Section {i+1}**\n{section}") |
|
st.markdown("**Chat with the model:**") |
|
for i, section in enumerate(list(document_sections)): |
|
if i in document_responses: |
|
st.markdown(f"**Section {i+1}**\n{document_responses[i]}") |
|
else: |
|
if st.button(f"Chat about Section {i+1}"): |
|
st.write('Reasoning with your inputs...') |
|
response = chat_with_model(user_prompt, section, model_choice) |
|
document_responses[i] = response |
|
filename = generate_filename(f"{user_prompt}_section_{i+1}", choice) |
|
create_file(filename, user_prompt, response, should_save) |
|
st.sidebar.markdown(get_table_download_link(filename), unsafe_allow_html=True) |
|
|
|
if st.button('π¬ Chat'): |
|
st.write('Reasoning with your inputs...') |
|
|
|
|
|
user_prompt_sections = divide_prompt(user_prompt, max_length) |
|
full_response = '' |
|
for prompt_section in user_prompt_sections: |
|
|
|
response = chat_with_model(prompt_section, ''.join(list(document_sections)), model_choice) |
|
full_response += response + '\n' |
|
response = full_response |
|
filename = generate_filename(user_prompt, choice) |
|
create_file(filename, user_prompt, response, should_save) |
|
st.sidebar.markdown(get_table_download_link(filename), unsafe_allow_html=True) |
|
|
|
all_files = glob.glob("*.*") |
|
all_files = [file for file in all_files if len(os.path.splitext(file)[0]) >= 20] |
|
all_files.sort(key=lambda x: (os.path.splitext(x)[1], x), reverse=True) |
|
|
|
|
|
|
|
colDownloadAll, colDeleteAll = st.sidebar.columns([3,3]) |
|
with colDownloadAll: |
|
if st.button("β¬οΈ Download All"): |
|
zip_file = create_zip_of_files(all_files) |
|
st.markdown(get_zip_download_link(zip_file), unsafe_allow_html=True) |
|
with colDeleteAll: |
|
if st.button("π Delete All"): |
|
for file in all_files: |
|
os.remove(file) |
|
st.experimental_rerun() |
|
|
|
|
|
file_contents='' |
|
next_action='' |
|
for file in all_files: |
|
col1, col2, col3, col4, col5 = st.sidebar.columns([1,6,1,1,1]) |
|
with col1: |
|
if st.button("π", key="md_"+file): |
|
with open(file, 'r') as f: |
|
file_contents = f.read() |
|
next_action='md' |
|
with col2: |
|
st.markdown(get_table_download_link(file), unsafe_allow_html=True) |
|
with col3: |
|
if st.button("π", key="open_"+file): |
|
with open(file, 'r') as f: |
|
file_contents = f.read() |
|
next_action='open' |
|
with col4: |
|
if st.button("π", key="read_"+file): |
|
with open(file, 'r') as f: |
|
file_contents = f.read() |
|
next_action='search' |
|
with col5: |
|
if st.button("π", key="delete_"+file): |
|
os.remove(file) |
|
st.experimental_rerun() |
|
|
|
if len(file_contents) > 0: |
|
if next_action=='open': |
|
file_content_area = st.text_area("File Contents:", file_contents, height=500) |
|
if next_action=='md': |
|
st.markdown(file_contents) |
|
if next_action=='search': |
|
file_content_area = st.text_area("File Contents:", file_contents, height=500) |
|
st.write('Reasoning with your inputs...') |
|
response = chat_with_model(user_prompt, file_contents, model_choice) |
|
filename = generate_filename(file_contents, choice) |
|
create_file(filename, user_prompt, response, should_save) |
|
|
|
st.experimental_rerun() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
load_dotenv() |
|
st.write(css, unsafe_allow_html=True) |
|
|
|
st.header("Chat with documents :books:") |
|
user_question = st.text_input("Ask a question about your documents:") |
|
if user_question: |
|
process_user_input(user_question) |
|
|
|
with st.sidebar: |
|
st.subheader("Your documents") |
|
docs = st.file_uploader("import documents", accept_multiple_files=True) |
|
with st.spinner("Processing"): |
|
raw = pdf2txt(docs) |
|
if len(raw) > 0: |
|
length = str(len(raw)) |
|
text_chunks = txt2chunks(raw) |
|
vectorstore = vector_store(text_chunks) |
|
st.session_state.conversation = get_chain(vectorstore) |
|
st.markdown('# AI Search Index of Length:' + length + ' Created.') |
|
filename = generate_filename(raw, 'txt') |
|
create_file(filename, raw, '', should_save) |