urld / app2.py
acecalisto3's picture
Update app2.py
97adf15 verified
raw
history blame
9.95 kB
import gradio as gr
import requests
import zipfile
import uuid
import bs4
import lxml
import os
from huggingface_hub import InferenceClient, HfApi
import random
import json
import datetime
from pypdf import PdfReader
from agent import (
PREFIX,
COMPRESS_DATA_PROMPT,
COMPRESS_DATA_PROMPT_SMALL,
LOG_PROMPT,
LOG_RESPONSE,
)
# Initialize Hugging Face client
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1")
reponame = "acecalisto3/tmp"
save_data = f'https://huggingface.co/datasets/{reponame}/raw/main/'
# Get HF token from environment or use demo mode
token_self = os.environ.get('HF_TOKEN', 'dummy_token') # Use dummy token for demo
if token_self == 'dummy_token':
print("Warning: Running in demo mode without HuggingFace token. Some features may be limited.")
api = HfApi(token=token_self)
# Constants
VERBOSE = True
MAX_HISTORY = 100
MAX_DATA = 20000
def find_all(purpose, task, history, url, result, steps):
return_list = []
visited_links = set()
links_to_visit = [(url, 0)]
while links_to_visit:
current_url, current_depth = links_to_visit.pop(0)
if current_depth < steps:
try:
if current_url not in visited_links:
visited_links.add(current_url)
source = requests.get(current_url)
if source.status_code == 200:
soup = bs4.BeautifulSoup(source.content, 'lxml')
rawp = f'RAW TEXT RETURNED: {soup.text}'
return_list.append(rawp)
for link in soup.find_all("a"):
href = link.get('href')
if href and href.startswith('http'):
links_to_visit.append((href, current_depth + 1))
except Exception as e:
print(f"Error fetching {current_url}: {e}")
return True, return_list
def read_txt(txt_path):
with open(txt_path, "r") as f:
text = f.read()
return text
def read_pdf(pdf_path):
text = ""
reader = PdfReader(pdf_path)
for page in reader.pages:
text = f'{text}\n{page.extract_text()}'
return text
error_box = []
def read_pdf_online(url):
print(f"reading {url}")
response = requests.get(url, stream=True)
if response.status_code == 200:
with open("test.pdf", "wb") as f:
f.write(response.content)
reader = PdfReader("test.pdf")
text = ""
for page in reader.pages:
text = f'{text}\n{page.extract_text()}'
return text
else:
error_box.append(url)
return str(response.status_code)
def format_prompt(message, history):
prompt = "<s>"
for user_prompt, bot_response in history:
prompt += f"[INST] {user_prompt} [/INST]"
prompt += f" {bot_response}</s> "
prompt += f"[INST] {message} [/INST]"
return prompt
def run_gpt(prompt_template, stop_tokens, max_tokens, seed, **prompt_kwargs):
timestamp = datetime.datetime.now()
generate_kwargs = dict(
temperature=0.9,
max_new_tokens=max_tokens,
top_p=0.95,
repetition_penalty=1.0,
do_sample=True,
seed=seed,
)
content = PREFIX.format(
timestamp=timestamp,
purpose="Compile the provided data and complete the users task"
) + prompt_template.format(**prompt_kwargs)
if VERBOSE:
print(LOG_PROMPT.format(content))
stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False)
resp = ""
for response in stream:
resp += response.token.text
if VERBOSE:
print(LOG_RESPONSE.format(resp))
return resp
def compress_data(c, instruct, history):
seed = random.randint(1, 1000000000)
divr = int(c)/MAX_DATA
divi = int(divr)+1 if divr != int(divr) else int(divr)
chunk = int(int(c)/divr)
out = []
s = 0
e = chunk
for z in range(divi):
hist = history[s:e]
resp = run_gpt(
COMPRESS_DATA_PROMPT_SMALL,
stop_tokens=["observation:", "task:", "action:", "thought:"],
max_tokens=8192,
seed=seed,
direction=instruct,
knowledge="",
history=hist,
)
out.append(resp)
e = e+chunk
s = s+chunk
return out
def create_zip_file(output_data, zip_name):
with zipfile.ZipFile(zip_name, 'w') as zipf:
for i, data in enumerate(output_data):
zipf.writestr(f'data_{i}.txt', data)
return zip_name
def process_and_format_response(instructions, chat_history, report, summary_memory,
input_data, uploaded_files, input_url, pdf_input_url):
try:
# Process URL if provided
if input_url:
success, content = find_all("Extract content", "", [], input_url, "", 1)
if success and content:
processed_text = "\n".join(content)
else:
return "", [["Error", "Failed to fetch URL content"]], "URL processing failed", None
# Process uploaded files
elif uploaded_files:
processed_text = ""
for file in uploaded_files:
if file.name.endswith('.pdf'):
processed_text += read_pdf(file.name) + "\n\n"
elif file.name.endswith('.txt'):
processed_text += read_txt(file.name) + "\n\n"
# Process direct text input
elif input_data:
processed_text = input_data
else:
return "", [["Error", "No input provided"]], "No input data", None
# Generate summary using compress_data
if processed_text:
c = len(processed_text.split())
summary = compress_data(c, instructions or "Summarize this text", processed_text)
# Format the response
if isinstance(summary, list):
summary_text = "\n".join(summary)
else:
summary_text = str(summary)
# Create chat messages
messages = [
["Input", processed_text[:500] + "..."], # Show first 500 chars of input
["Summary", summary_text]
]
# Create JSON output
json_output = {
"input_length": len(processed_text),
"summary_length": len(summary_text),
"summary": summary_text
}
return "", messages, "Processing completed successfully", json_output
except Exception as e:
error_msg = f"Error: {str(e)}"
return "", [["Error", error_msg]], error_msg, None
def clear_fn():
return "", []
# Create Gradio interface
with gr.Blocks() as app:
gr.HTML("""<center><h1>Mixtral 8x7B TLDR Summarizer + Web</h1><h3>Summarize Data of unlimited length</h3></center>""")
# Main chat interface
with gr.Row():
chatbot = gr.Chatbot(
label="Mixtral 8x7B Chatbot",
show_copy_button=True,
height=400
)
# Control Panel
with gr.Row():
with gr.Column(scale=3):
prompt = gr.Textbox(
label="Instructions",
placeholder="Enter processing instructions here..."
)
steps = gr.Slider(
label="Crawl Steps",
minimum=1,
maximum=5,
value=1,
info="Number of levels to crawl for web content"
)
with gr.Column(scale=1):
report_check = gr.Checkbox(
label="Return Report",
value=True,
info="Generate detailed analysis report"
)
sum_mem_check = gr.Radio(
label="Output Type",
choices=["Summary", "Memory"],
value="Summary",
info="Choose between summarized or memory-based output"
)
process_btn = gr.Button("Process", variant="primary")
# Input Tabs
with gr.Tabs() as input_tabs:
with gr.Tab("πŸ“ Text"):
text_input = gr.Textbox(
label="Input Text",
lines=6,
placeholder="Paste your text here..."
)
with gr.Tab("πŸ“ File"):
file_input = gr.File(
label="Upload Files",
file_types=[".pdf", ".txt"],
file_count="multiple"
)
with gr.Tab("🌐 Web URL"):
url_input = gr.Textbox(
label="Website URL",
placeholder="https://example.com"
)
with gr.Tab("πŸ“„ PDF URL"):
pdf_url_input = gr.Textbox(
label="PDF URL",
placeholder="https://example.com/document.pdf"
)
# Output Section
with gr.Row():
with gr.Column():
json_output = gr.JSON(
label="Structured Output",
show_label=True
)
with gr.Column():
error_output = gr.Textbox(
label="Status & Errors",
interactive=False
)
# Event handlers
process_btn.click(
process_and_format_response,
inputs=[
prompt,
chatbot,
report_check,
sum_mem_check,
text_input,
file_input,
url_input,
pdf_url_input
],
outputs=[
prompt,
chatbot,
error_output,
json_output
]
)
# Launch the app
app.queue(default_concurrency_limit=20).launch(
show_api=False,
share=False,
server_name="0.0.0.0",
server_port=8000
)