Spaces:
Running
Running
import json | |
import os | |
import re | |
import logging | |
import mimetypes | |
import time | |
from PIL import Image | |
import zxing | |
import io | |
import zipfile | |
import tempfile | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union | |
from pathlib import Path | |
import requests | |
import validators | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Ensure output directories exist | |
Path('output/qr_codes').mkdir(parents=True, exist_ok=True) | |
class URLProcessor: | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 10 | |
self.max_retries = 3 | |
self.request_delay = 1.0 | |
self.respect_robots = True | |
self.use_proxy = False | |
self.proxy_url = None | |
# Update session headers | |
self.session.headers.update({ | |
'User-Agent': UserAgent().random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
if self.use_proxy and self.proxy_url: | |
self.session.proxies = { | |
'http': self.proxy_url, | |
'https': self.proxy_url | |
} | |
def check_robots_txt(self, url: str) -> bool: | |
"""Check if URL is allowed by robots.txt""" | |
if not self.respect_robots: | |
return True | |
try: | |
from urllib.parse import urlparse | |
from urllib.robotparser import RobotFileParser | |
parsed_url = urlparse(url) | |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" | |
rp = RobotFileParser() | |
rp.set_url(robots_url) | |
rp.read() | |
return rp.can_fetch(self.session.headers['User-Agent'], url) | |
except Exception as e: | |
logger.warning(f"Error checking robots.txt: {e}") | |
return True | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Fetch content with built-in rate limiting and robots.txt checking""" | |
if not self.check_robots_txt(url): | |
logger.warning(f"URL {url} is disallowed by robots.txt") | |
return None | |
time.sleep(self.request_delay) # Basic rate limiting | |
for attempt in range(self.max_retries): | |
try: | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url: | |
return self._handle_google_calendar(url) | |
return self._fetch_html_content(url) | |
except Exception as e: | |
logger.error(f"Attempt {attempt + 1} failed: {e}") | |
if attempt < self.max_retries - 1: | |
time.sleep(self.request_delay * (attempt + 1)) | |
return None | |
def advanced_text_cleaning(self, text: str) -> str: | |
"""Robust text cleaning with version compatibility""" | |
try: | |
cleaned_text = clean( | |
text, | |
fix_unicode=True, | |
to_ascii=True, | |
lower=True, | |
no_line_breaks=True, | |
no_urls=True, | |
no_emails=True, | |
no_phone_numbers=True, | |
no_numbers=False, | |
no_digits=False, | |
no_currency_symbols=True, | |
no_punct=False | |
).strip() | |
return cleaned_text | |
except Exception as e: | |
logger.warning(f"Text cleaning error: {e}. Using fallback method.") | |
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) | |
text = text.encode('ascii', 'ignore').decode('ascii') | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
def validate_url(self, url: str) -> Dict: | |
"""Validate URL format and accessibility""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format'} | |
response = self.session.head(url, timeout=self.timeout) | |
response.raise_for_status() | |
return {'is_valid': True, 'message': 'URL is valid and accessible'} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'} | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Universal content fetcher with special case handling""" | |
try: | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url and 'ical' in url: | |
return self._handle_google_calendar(url) | |
return self._fetch_html_content(url) | |
except Exception as e: | |
logger.error(f"Content fetch failed: {e}") | |
return None | |
def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
"""Process Google Drive file links""" | |
try: | |
file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
if not file_id: | |
logger.error(f"Invalid Google Drive URL: {url}") | |
return None | |
direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}" | |
response = self.session.get(direct_url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Google Drive processing failed: {e}") | |
return None | |
def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
"""Process Google Calendar ICS feeds""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': 'text/calendar', | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Calendar fetch failed: {e}") | |
return None | |
def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
"""Standard HTML content processing""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
element.decompose() | |
main_content = soup.find('main') or soup.find('article') or soup.body | |
if main_content is None: | |
logger.warning(f"No main content found for URL: {url}") | |
return { | |
'content': '', | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
text_content = main_content.get_text(separator='\n', strip=True) | |
cleaned_content = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_content, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"HTML processing failed: {e}") | |
return None | |
class FileProcessor: | |
"""Class to handle file processing""" | |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
return (mime_type and mime_type.startswith('text/')) or \ | |
(os.path.splitext(filepath)[1].lower() in self.supported_text_extensions) | |
except Exception: | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
else: | |
dataset.extend(self._process_single_file(file)) | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [] | |
return dataset | |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]: | |
"""Process ZIP file contents""" | |
results = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
for root, _, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
if content.strip(): | |
results.append({ | |
"source": "file", | |
"filename": filename, | |
"content": content, | |
"timestamp": datetime.now ().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
return results | |
def _process_single_file(self, file) -> List[Dict]: | |
"""Process a single file""" | |
try: | |
file_stat = os.stat(file.name) | |
if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)") | |
content = "" | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read(1 * 1024 * 1024) # First 1MB | |
content += "\n...[Content truncated due to large file size]...\n" | |
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
content += f.read() # Last 1MB | |
else: | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_stat.st_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [] | |
def clean_json(data: Union[str, Dict]) -> Optional[Dict]: | |
"""Clean and validate JSON data""" | |
try: | |
if isinstance(data, str): | |
data = data.strip() | |
data = json.loads(data) | |
cleaned = json.loads(json.dumps(data)) | |
return cleaned | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON cleaning error: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error while cleaning JSON: {e}") | |
return None | |
def generate_qr_code(data: Union[str, Dict], combined: bool = True) -> List[str]: | |
"""Generate QR code(s) from data""" | |
try: | |
output_dir = Path('output/qr_codes') | |
output_dir.mkdir(parents=True, exist_ok=True) | |
if combined: | |
cleaned_data = clean_json(data) | |
if cleaned_data: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_data, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qr.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'combined_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
return [str(output_path)] | |
else: | |
if isinstance(data, list): | |
paths = [] | |
for idx, item in enumerate(data): | |
cleaned_item = clean_json(item) | |
if cleaned_item: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qr.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'item_{idx}_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
paths.append(str(output_path)) | |
return paths | |
else: | |
cleaned_item = clean_json(data) | |
if cleaned_item: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'single_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
return [str(output_path)] | |
return [] | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return [] | |
def decode_qr_code(image_path: str) -> Optional[str]: | |
"""Decode QR code from an image file using ZXing""" | |
try: | |
reader = zxing.BarCodeReader() | |
result = reader.decode(image_path) | |
if result and result.parsed: | |
return result.parsed | |
logger.warning("No QR code found in image") | |
return None | |
except Exception as e: | |
logger.error(f"QR decoding error: {e}") | |
return None | |
def decode_qr(image) -> List[str]: | |
"""Decode all QR codes found in an image using ZXing""" | |
try: | |
if isinstance(image, str): | |
image_path = image | |
else: | |
# Save temporary image if input is not a path | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: | |
Image.fromarray(image).save(tmp.name) | |
image_path = tmp.name | |
reader = zxing.BarCodeReader() | |
result = reader.decode(image_path) | |
if result and result.parsed: | |
return [result.parsed] | |
return [] | |
except Exception as e: | |
logger.error(f"QR decoding error: {e}") | |
return [] | |
raise ValueError("Unable to decode QR code") | |
except Exception as e: | |
logger.error(f"QR decoding error: {e}") | |
return None, None # Return None for both data and resolution in case of error | |
def datachat_trained(data_input: str, query: str) -> str: | |
"""Handle trained data interaction logic""" | |
data = clean_json(data_input) | |
if not data: | |
return "Invalid JSON data provided." | |
return f"[Trained Mode]\nData: {json.dumps(data, indent=2)}\nQuery: {query}" | |
def datachat_simple(data_input: str, query: str) -> str: | |
"""Handle simple chat interaction logic""" | |
data = clean_json(data_input) | |
if not data: | |
return "Invalid JSON data provided." | |
return f"[Chat Mode]\nData: {json.dumps(data, indent=2)}\nQuestion: {query}" | |
def datachat_interface(mode: str, data_source: str, json_input: str, qr_image: str, query: str) -> str: | |
"""Interface for DataChat functionality""" | |
data = None | |
resolution = None # Initialize resolution variable | |
if data_source == "JSON Input": | |
data = json_input | |
elif data_source == "QR Code": | |
try: | |
decoded_data, resolution = decode_qr_code(qr_image) # Get both data and resolution | |
data = decoded_data | |
except Exception as e: | |
return f"Invalid QR code data provided: {e}" | |
else: | |
return "No valid data source selected." | |
if mode == "Trained with Data": | |
return datachat_trained(data, query) | |
elif mode == "Chat about Data": | |
return datachat_simple(data, query) | |
else: | |
return "Invalid mode selected." | |
def create_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 4px; } | |
.error { background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 4px; } | |
.success { background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Code Generator") as interface: | |
gr.Markdown("# π Advanced Data Processing & QR Code Generator") | |
with gr.Tab("DataChat"): | |
mode = gr.Radio(["Trained with Data", "Chat about Data"], label="Mode") | |
data_source = gr.Radio(["JSON Input", "QR Code"], label="Data Source") | |
json_input = gr.Textbox(lines=8, label="JSON Data") | |
qr_image = gr.Image(label="QR Code Image", type="filepath") | |
query = gr.Textbox(label="Query") | |
submit_btn = gr.Button("Submit") | |
output = gr.Textbox(label="Response") | |
submit_btn.click(datachat_interface, [mode, data_source, json_input, qr_image, query], output) | |
with gr.Tab("QR Generator"): | |
qr_input = gr.Textbox(lines=8, label="Input JSON for QR") | |
generate_btn = gr.Button("Generate QR") | |
qr_output = gr.Image(label="Generated QR Code") | |
def generate_qr(json_data): | |
data = clean_json(json_data) | |
if data: | |
return generate_qr_code(data) | |
return None | |
generate_btn.click(generate_qr, qr_input, qr_output) | |
return interface | |
def main(): | |
mimetypes.init() | |
Path('output/qr_codes').mkdir(parents=True, exist_ok=True) | |
interface = create_interface() | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False, | |
inbrowser=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() | |