Spaces:
Running
Running
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
else: | |
dataset.extend(self._process_single_file(file)) | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [] | |
return dataset | |
def _process_zip_file(self, zip_path, temp_dir): | |
"""Extract and process files within a ZIP archive.""" | |
result = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
for extracted_file in os.listdir(temp_dir): | |
extracted_file_path = os.path.join(temp_dir, extracted_file) | |
if os.path.isfile(extracted_file_path): | |
with open(extracted_file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
result.append({ | |
'source': 'file_from_zip', | |
'filename': extracted_file, | |
'content': f.read(), | |
'timestamp': datetime.now().isoformat() | |
}) | |
return result | |
def _process_single_file(self, file) -> List[Dict]: | |
try: | |
file_stat = os.stat(file.name) | |
# For very large files, read in chunks and summarize | |
if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)") | |
# Read first and last 1MB for extremely large files | |
content = "" | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read(1 * 1024 * 1024) # First 1MB | |
content += "\n...[Content truncated due to large file size]...\n" | |
# Seek to the last 1MB | |
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
content += f.read() # Last 1MB | |
else: | |
# Regular file processing | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_stat.st_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [] | |
# Move process_all_inputs outside of the FileProcessor class | |
def process_all_inputs(urls, file, text, notes): | |
"""Process all input types with progress tracking""" | |
try: | |
processor = URLProcessor() | |
file_processor = FileProcessor() | |
results = [] | |
# Process URLs | |
if urls: | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
validation = processor.validate_url(url) | |
if validation.get('is_valid'): | |
content = processor.fetch_content(url) | |
if content: | |
results.append({ | |
'source': 'url', | |
'url': url, | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Process files | |
if file: | |
results.extend(file_processor.process_file(file)) | |
# Process text input | |
if text: | |
cleaned_text = processor.advanced_text_cleaning(text) | |
results.append({ | |
'source': 'direct_input', | |
'content': cleaned_text, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Generate output | |
if results: | |
output_dir = Path('output') / datetime.now().strftime('%Y-%m-%d') | |
output_dir.mkdir(parents=True, exist_ok=True) | |
output_path = output_dir / f'processed_{int(time.time())}.json' | |
with open(output_path, 'w', encoding='utf-8') as f: | |
json.dump(results, f, ensure_ascii=False, indent=2) | |
summary = f"Processed {len(results)} items successfully!" | |
json_data = json.dumps(results, indent=2) # Prepare JSON for QR code | |
return str(output_path), summary, json_data # Return JSON for editor | |
else: | |
return None, "No valid content to process.", "" | |
except Exception as e: | |
logger.error(f"Processing error: {e}") | |
return None, f"Error: {str(e)}", "" | |
# Also move generate_qr_code outside of the FileProcessor class | |
def generate_qr_code(json_data): | |
"""Generate QR code from JSON data and return the file path.""" | |
if json_data: | |
return generate_qr(json_data) | |
# Move generate_qr outside of the FileProcessor class as well | |
def generate_qr(json_data): | |
"""Generate QR code from JSON data and return the file path.""" | |
try: | |
# Try first with automatic version selection | |
qr = qrcode.QRCode( | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
qr.add_data(json_data) | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
img.save(temp_file.name) | |
return temp_file.name | |
except Exception as e: | |
# If the data is too large for a QR code | |
logger.error(f"QR generation error: {e}") | |
# Create a simple QR with error message | |
qr = qrcode.QRCode( | |
version=1, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
qr.add_data("Error: Data too large for QR code") | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
img.save(temp_file.name) | |
return temp_file.name | |
def create_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; } | |
.error { background-color: #f8d7da; color: #721c24; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Text & URL Processing") as interface: | |
gr.Markdown("# π Advanced URL & Text Processing Toolkit") | |
with gr.Tab("URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com" | |
) | |
with gr.Tab("File Input"): | |
file_input = gr.File( | |
label="Upload text file or ZIP archive", | |
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"] | |
) | |
with gr.Tab("Text Input"): | |
text_input = gr.Textbox( | |
label="Raw Text Input", | |
lines=5, | |
placeholder="Paste your text here..." | |
) | |
with gr.Tab("JSON Editor"): | |
json_editor = gr.Textbox( | |
label="JSON Editor", | |
lines=20, | |
placeholder="View and edit your JSON data here...", | |
interactive=True, | |
elem_id="json-editor" # Optional: for custom styling | |
) | |
with gr.Tab("Scratchpad"): | |
scratchpad = gr.Textbox( | |
label="Scratchpad", | |
lines=10, | |
placeholder="Quick notes or text collections...", | |
interactive=True | |
) | |
process_btn = gr.Button("Process Input", variant="primary") | |
qr_btn = gr.Button("Generate QR Code", variant="secondary") | |
output_text = gr.Textbox(label="Processing Results", interactive=False) | |
output_file = gr.File(label="Processed Output") | |
qr_output = gr.Image(label="QR Code", type="filepath") # To display the generated QR code | |
process_btn.click( | |
process_all_inputs, | |
inputs=[url_input, file_input, text_input, scratchpad], | |
outputs=[output_file, output_text, json_editor] # Update outputs to include JSON editor | |
) | |
qr_btn.click( | |
generate_qr_code, | |
inputs=json_editor, | |
outputs=qr_output | |
) | |
gr.Markdown(""" | |
### Usage Guidelines | |
- **URL Processing**: Enter valid HTTP/HTTPS URLs | |
- **File Input**: Upload text files or ZIP archives | |
- ** Text Input**: Direct text processing | |
- **JSON Editor**: View and edit your JSON data | |
- **Scratchpad**: Quick notes or text collections | |
- Advanced cleaning and validation included | |
""") | |
return interface | |
def check_network_connectivity(): | |
"""Check if the network is working properly by testing connection to common sites""" | |
test_sites = ["https://www.google.com", "https://www.cloudflare.com", "https://www.amazon.com"] | |
results = [] | |
for site in test_sites: | |
try: | |
response = requests.get(site, timeout=5) | |
results.append({ | |
"site": site, | |
"status": "OK" if response.status_code == 200 else f"Error: {response.status_code}", | |
"response_time": response.elapsed.total_seconds() | |
}) | |
except Exception as e: | |
results.append({ | |
"site": site, | |
"status": f"Error: {str(e)}", | |
"response_time": None | |
}) | |
# If all sites failed, there might be a network issue | |
if all(result["status"].startswith("Error") for result in results): | |
logger.error("Network connectivity issue detected. All test sites failed.") | |
return False, results | |
return True, results | |
# Add this to the main function | |
def main(): | |
# Configure system settings | |
mimetypes.init() | |
# Check network connectivity | |
network_ok, network_results = check_network_connectivity() | |
if not network_ok: | |
logger.warning("Network connectivity issues detected. Some features may not work properly.") | |
for result in network_results: | |
logger.warning(f"Test site {result['site']}: {result['status']}") | |
# Create and launch interface | |
interface = create_interface() | |
# Launch with proper configuration | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False, | |
inbrowser=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() | |