Spaces:
Running
Running
import gradio as gr | |
import requests | |
import zipfile | |
import uuid | |
import bs4 | |
import lxml | |
import os | |
from huggingface_hub import InferenceClient, HfApi | |
import random | |
import json | |
import datetime | |
from pypdf import PdfReader | |
from agent import ( | |
PREFIX, | |
COMPRESS_DATA_PROMPT, | |
COMPRESS_DATA_PROMPT_SMALL, | |
LOG_PROMPT, | |
LOG_RESPONSE, | |
) | |
# Initialize Hugging Face client | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
reponame = "acecalisto3/tmp" | |
save_data = f'https://huggingface.co/datasets/{reponame}/raw/main/' | |
# Get HF token from environment or use demo mode | |
token_self = os.environ.get('HF_TOKEN', 'dummy_token') # Use dummy token for demo | |
if token_self == 'dummy_token': | |
print("Warning: Running in demo mode without HuggingFace token. Some features may be limited.") | |
api = HfApi(token=token_self) | |
# Constants | |
VERBOSE = True | |
MAX_HISTORY = 100 | |
MAX_DATA = 20000 | |
def find_all(purpose, task, history, url, result, steps): | |
return_list = [] | |
visited_links = set() | |
links_to_visit = [(url, 0)] | |
while links_to_visit: | |
current_url, current_depth = links_to_visit.pop(0) | |
if current_depth < steps: | |
try: | |
if current_url not in visited_links: | |
visited_links.add(current_url) | |
source = requests.get(current_url) | |
if source.status_code == 200: | |
soup = bs4.BeautifulSoup(source.content, 'lxml') | |
rawp = f'RAW TEXT RETURNED: {soup.text}' | |
return_list.append(rawp) | |
for link in soup.find_all("a"): | |
href = link.get('href') | |
if href and href.startswith('http'): | |
links_to_visit.append((href, current_depth + 1)) | |
except Exception as e: | |
print(f"Error fetching {current_url}: {e}") | |
return True, return_list | |
def read_txt(txt_path): | |
with open(txt_path, "r") as f: | |
text = f.read() | |
return text | |
def read_pdf(pdf_path): | |
text = "" | |
reader = PdfReader(pdf_path) | |
for page in reader.pages: | |
text = f'{text}\n{page.extract_text()}' | |
return text | |
error_box = [] | |
def read_pdf_online(url): | |
print(f"reading {url}") | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
with open("test.pdf", "wb") as f: | |
f.write(response.content) | |
reader = PdfReader("test.pdf") | |
text = "" | |
for page in reader.pages: | |
text = f'{text}\n{page.extract_text()}' | |
return text | |
else: | |
error_box.append(url) | |
return str(response.status_code) | |
def format_prompt(message, history): | |
prompt = "<s>" | |
for user_prompt, bot_response in history: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
def run_gpt(prompt_template, stop_tokens, max_tokens, seed, **prompt_kwargs): | |
timestamp = datetime.datetime.now() | |
generate_kwargs = dict( | |
temperature=0.9, | |
max_new_tokens=max_tokens, | |
top_p=0.95, | |
repetition_penalty=1.0, | |
do_sample=True, | |
seed=seed, | |
) | |
content = PREFIX.format( | |
timestamp=timestamp, | |
purpose="Compile the provided data and complete the users task" | |
) + prompt_template.format(**prompt_kwargs) | |
if VERBOSE: | |
print(LOG_PROMPT.format(content)) | |
stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
resp = "" | |
for response in stream: | |
resp += response.token.text | |
if VERBOSE: | |
print(LOG_RESPONSE.format(resp)) | |
return resp | |
def compress_data(c, instruct, history): | |
seed = random.randint(1, 1000000000) | |
divr = int(c)/MAX_DATA | |
divi = int(divr)+1 if divr != int(divr) else int(divr) | |
chunk = int(int(c)/divr) | |
out = [] | |
s = 0 | |
e = chunk | |
for z in range(divi): | |
hist = history[s:e] | |
resp = run_gpt( | |
COMPRESS_DATA_PROMPT_SMALL, | |
stop_tokens=["observation:", "task:", "action:", "thought:"], | |
max_tokens=8192, | |
seed=seed, | |
direction=instruct, | |
knowledge="", | |
history=hist, | |
) | |
out.append(resp) | |
e = e+chunk | |
s = s+chunk | |
return out | |
def create_zip_file(output_data, zip_name): | |
with zipfile.ZipFile(zip_name, 'w') as zipf: | |
for i, data in enumerate(output_data): | |
zipf.writestr(f'data_{i}.txt', data) | |
return zip_name | |
def process_and_format_response(instructions, chat_history, report, summary_memory, | |
input_data, uploaded_files, input_url, pdf_input_url): | |
try: | |
# Process URL if provided | |
if input_url: | |
success, content = find_all("Extract content", "", [], input_url, "", 1) | |
if success and content: | |
processed_text = "\n".join(content) | |
else: | |
return "", [["Error", "Failed to fetch URL content"]], "URL processing failed", None | |
# Process uploaded files | |
elif uploaded_files: | |
processed_text = "" | |
for file in uploaded_files: | |
if file.name.endswith('.pdf'): | |
processed_text += read_pdf(file.name) + "\n\n" | |
elif file.name.endswith('.txt'): | |
processed_text += read_txt(file.name) + "\n\n" | |
# Process direct text input | |
elif input_data: | |
processed_text = input_data | |
else: | |
return "", [["Error", "No input provided"]], "No input data", None | |
# Generate summary using compress_data | |
if processed_text: | |
c = len(processed_text.split()) | |
summary = compress_data(c, instructions or "Summarize this text", processed_text) | |
# Format the response | |
if isinstance(summary, list): | |
summary_text = "\n".join(summary) | |
else: | |
summary_text = str(summary) | |
# Create chat messages | |
messages = [ | |
["Input", processed_text[:500] + "..."], # Show first 500 chars of input | |
["Summary", summary_text] | |
] | |
# Create JSON output | |
json_output = { | |
"input_length": len(processed_text), | |
"summary_length": len(summary_text), | |
"summary": summary_text | |
} | |
return "", messages, "Processing completed successfully", json_output | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
return "", [["Error", error_msg]], error_msg, None | |
def clear_fn(): | |
return "", [] | |
# Create Gradio interface | |
with gr.Blocks() as app: | |
gr.HTML("""<center><h1>Mixtral 8x7B TLDR Summarizer + Web</h1><h3>Summarize Data of unlimited length</h3></center>""") | |
# Main chat interface | |
with gr.Row(): | |
chatbot = gr.Chatbot( | |
label="Mixtral 8x7B Chatbot", | |
show_copy_button=True, | |
height=400 | |
) | |
# Control Panel | |
with gr.Row(): | |
with gr.Column(scale=3): | |
prompt = gr.Textbox( | |
label="Instructions", | |
placeholder="Enter processing instructions here..." | |
) | |
steps = gr.Slider( | |
label="Crawl Steps", | |
minimum=1, | |
maximum=5, | |
value=1, | |
info="Number of levels to crawl for web content" | |
) | |
with gr.Column(scale=1): | |
report_check = gr.Checkbox( | |
label="Return Report", | |
value=True, | |
info="Generate detailed analysis report" | |
) | |
sum_mem_check = gr.Radio( | |
label="Output Type", | |
choices=["Summary", "Memory"], | |
value="Summary", | |
info="Choose between summarized or memory-based output" | |
) | |
process_btn = gr.Button("Process", variant="primary") | |
# Input Tabs | |
with gr.Tabs() as input_tabs: | |
with gr.Tab("π Text"): | |
text_input = gr.Textbox( | |
label="Input Text", | |
lines=6, | |
placeholder="Paste your text here..." | |
) | |
with gr.Tab("π File"): | |
file_input = gr.File( | |
label="Upload Files", | |
file_types=[".pdf", ".txt"], | |
file_count="multiple" | |
) | |
with gr.Tab("π Web URL"): | |
url_input = gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com" | |
) | |
with gr.Tab("π PDF URL"): | |
pdf_url_input = gr.Textbox( | |
label="PDF URL", | |
placeholder="https://example.com/document.pdf" | |
) | |
# Output Section | |
with gr.Row(): | |
with gr.Column(): | |
json_output = gr.JSON( | |
label="Structured Output", | |
show_label=True | |
) | |
with gr.Column(): | |
error_output = gr.Textbox( | |
label="Status & Errors", | |
interactive=False | |
) | |
# Event handlers | |
process_btn.click( | |
process_and_format_response, | |
inputs=[ | |
prompt, | |
chatbot, | |
report_check, | |
sum_mem_check, | |
text_input, | |
file_input, | |
url_input, | |
pdf_url_input | |
], | |
outputs=[ | |
prompt, | |
chatbot, | |
error_output, | |
json_output | |
] | |
) | |
# Launch the app | |
app.queue(default_concurrency_limit=20).launch( | |
show_api=False, | |
share=False, | |
server_name="0.0.0.0", | |
server_port=8000 | |
) | |