Spaces:
Running
Running
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
import json | |
import time | |
from tqdm import tqdm | |
import zipfile | |
import io | |
import os | |
import tempfile | |
import mimetypes | |
def fetch_content(url): | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
return response.text | |
except requests.RequestException as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
def extract_text(html): | |
soup = BeautifulSoup(html, 'html.parser') | |
for script in soup(["script", "style"]): | |
script.decompose() | |
text = soup.get_text() | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = '\n'.join(chunk for chunk in chunks if chunk) | |
return text | |
def process_urls(urls): | |
dataset = [] | |
for url in tqdm(urls, desc="Fetching URLs"): | |
html = fetch_content(url) | |
if html: | |
text = extract_text(html) | |
dataset.append({ | |
"source": "url", | |
"url": url, | |
"content": text | |
}) | |
time.sleep(1) # Be polite to the server | |
return dataset | |
def process_file(file): | |
dataset = [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
if zipfile.is_zipfile(file.name): | |
with zipfile.ZipFile(file.name, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
for root, _, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
mime_type, _ = mimetypes.guess_type(filepath) | |
if mime_type and mime_type.startswith('text'): | |
with open(filepath, 'r', errors='ignore') as f: | |
content = f.read() | |
dataset.append({ | |
"source": "file", | |
"filename": filename, | |
"content": content | |
}) | |
else: | |
# For non-text files, just store the filename | |
dataset.append({ | |
"source": "file", | |
"filename": filename, | |
"content": "Binary file - content not extracted" | |
}) | |
else: | |
mime_type, _ = mimetypes.guess_type(file.name) | |
if mime_type and mime_type.startswith('text'): | |
content = file.read().decode('utf-8', errors='ignore') | |
dataset.append({ | |
"source": "file", | |
"filename": os.path.basename(file.name), | |
"content": content | |
}) | |
else: | |
# For non-text files, just store the filename | |
dataset.append({ | |
"source": "file", | |
"filename": os.path.basename(file.name), | |
"content": "Binary file - content not extracted" | |
}) | |
return dataset | |
def process_text(text): | |
return [{ | |
"source": "text_input", | |
"content": text | |
}] | |
def create_dataset(urls, file, text_input): | |
dataset = [] | |
if urls: | |
dataset.extend(process_urls(urls.split(','))) | |
if file: | |
dataset.extend(process_file(file)) | |
if text_input: | |
dataset.extend(process_text(text_input)) | |
# Save the dataset as JSON | |
with open('combined_dataset.json', 'w') as f: | |
json.dump(dataset, f, indent=2) | |
return 'combined_dataset.json' | |
Gradio Interface | |
iface = gr.Interface( | |
fn=create_dataset, | |
inputs=[ | |
gr.Textbox(lines=5, label="Enter comma-separated URLs"), | |
gr.File(label="Upload file (including zip files)"), | |
gr.Textbox(lines=10, label="Enter or paste large text") | |
], | |
outputs=gr.File(label="Download Combined Dataset"), | |
title="URL, File, and Text to Dataset Converter", | |
description="Enter URLs, upload files (including zip files), and/or paste text to create a combined dataset for AI training.", | |
) | |
Launch the interface | |
iface.launch() |