urld / app2.py
acecalisto3's picture
Create app2.py
14baf76 verified
raw
history blame
11.2 kB
return []
with tempfile.TemporaryDirectory() as temp_dir:
if zipfile.is_zipfile(file.name):
dataset.extend(self._process_zip_file(file.name, temp_dir))
else:
dataset.extend(self._process_single_file(file))
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
return []
return dataset
def _process_zip_file(self, zip_path, temp_dir):
"""Extract and process files within a ZIP archive."""
result = []
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
for extracted_file in os.listdir(temp_dir):
extracted_file_path = os.path.join(temp_dir, extracted_file)
if os.path.isfile(extracted_file_path):
with open(extracted_file_path, 'r', encoding='utf-8', errors='ignore') as f:
result.append({
'source': 'file_from_zip',
'filename': extracted_file,
'content': f.read(),
'timestamp': datetime.now().isoformat()
})
return result
def _process_single_file(self, file) -> List[Dict]:
try:
file_stat = os.stat(file.name)
# For very large files, read in chunks and summarize
if file_stat.st_size > 100 * 1024 * 1024: # 100MB
logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)")
# Read first and last 1MB for extremely large files
content = ""
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(1 * 1024 * 1024) # First 1MB
content += "\n...[Content truncated due to large file size]...\n"
# Seek to the last 1MB
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024))
content += f.read() # Last 1MB
else:
# Regular file processing
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return [{
'source': 'file',
'filename': os.path.basename(file.name),
'file_size': file_stat.st_size,
'mime_type': mimetypes.guess_type(file.name)[0],
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'content': content,
'timestamp': datetime.now().isoformat()
}]
except Exception as e:
logger.error(f"File processing error: {e}")
return []
# Move process_all_inputs outside of the FileProcessor class
def process_all_inputs(urls, file, text, notes):
"""Process all input types with progress tracking"""
try:
processor = URLProcessor()
file_processor = FileProcessor()
results = []
# Process URLs
if urls:
url_list = re.split(r'[,\n]', urls)
url_list = [url.strip() for url in url_list if url.strip()]
for url in url_list:
validation = processor.validate_url(url)
if validation.get('is_valid'):
content = processor.fetch_content(url)
if content:
results.append({
'source': 'url',
'url': url,
'content': content,
'timestamp': datetime.now().isoformat()
})
# Process files
if file:
results.extend(file_processor.process_file(file))
# Process text input
if text:
cleaned_text = processor.advanced_text_cleaning(text)
results.append({
'source': 'direct_input',
'content': cleaned_text,
'timestamp': datetime.now().isoformat()
})
# Generate output
if results:
output_dir = Path('output') / datetime.now().strftime('%Y-%m-%d')
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f'processed_{int(time.time())}.json'
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
summary = f"Processed {len(results)} items successfully!"
json_data = json.dumps(results, indent=2) # Prepare JSON for QR code
return str(output_path), summary, json_data # Return JSON for editor
else:
return None, "No valid content to process.", ""
except Exception as e:
logger.error(f"Processing error: {e}")
return None, f"Error: {str(e)}", ""
# Also move generate_qr_code outside of the FileProcessor class
def generate_qr_code(json_data):
"""Generate QR code from JSON data and return the file path."""
if json_data:
return generate_qr(json_data)
# Move generate_qr outside of the FileProcessor class as well
def generate_qr(json_data):
"""Generate QR code from JSON data and return the file path."""
try:
# Try first with automatic version selection
qr = qrcode.QRCode(
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(json_data)
qr.make(fit=True)
img = qrcode.make_image(fill_color="black", back_color="white")
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
img.save(temp_file.name)
return temp_file.name
except Exception as e:
# If the data is too large for a QR code
logger.error(f"QR generation error: {e}")
# Create a simple QR with error message
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data("Error: Data too large for QR code")
qr.make(fit=True)
img = qrcode.make_image(fill_color="black", back_color="white")
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
img.save(temp_file.name)
return temp_file.name
def create_interface():
"""Create a comprehensive Gradio interface with advanced features"""
css = """
.container { max-width: 1200px; margin: auto; }
.warning { background-color: #fff3cd; color: #856404; }
.error { background-color: #f8d7da; color: #721c24; }
"""
with gr.Blocks(css=css, title="Advanced Text & URL Processing") as interface:
gr.Markdown("# 🌐 Advanced URL & Text Processing Toolkit")
with gr.Tab("URL Processing"):
url_input = gr.Textbox(
label="Enter URLs (comma or newline separated)",
lines=5,
placeholder="https://example1.com\nhttps://example2.com"
)
with gr.Tab("File Input"):
file_input = gr.File(
label="Upload text file or ZIP archive",
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"]
)
with gr.Tab("Text Input"):
text_input = gr.Textbox(
label="Raw Text Input",
lines=5,
placeholder="Paste your text here..."
)
with gr.Tab("JSON Editor"):
json_editor = gr.Textbox(
label="JSON Editor",
lines=20,
placeholder="View and edit your JSON data here...",
interactive=True,
elem_id="json-editor" # Optional: for custom styling
)
with gr.Tab("Scratchpad"):
scratchpad = gr.Textbox(
label="Scratchpad",
lines=10,
placeholder="Quick notes or text collections...",
interactive=True
)
process_btn = gr.Button("Process Input", variant="primary")
qr_btn = gr.Button("Generate QR Code", variant="secondary")
output_text = gr.Textbox(label="Processing Results", interactive=False)
output_file = gr.File(label="Processed Output")
qr_output = gr.Image(label="QR Code", type="filepath") # To display the generated QR code
process_btn.click(
process_all_inputs,
inputs=[url_input, file_input, text_input, scratchpad],
outputs=[output_file, output_text, json_editor] # Update outputs to include JSON editor
)
qr_btn.click(
generate_qr_code,
inputs=json_editor,
outputs=qr_output
)
gr.Markdown("""
### Usage Guidelines
- **URL Processing**: Enter valid HTTP/HTTPS URLs
- **File Input**: Upload text files or ZIP archives
- ** Text Input**: Direct text processing
- **JSON Editor**: View and edit your JSON data
- **Scratchpad**: Quick notes or text collections
- Advanced cleaning and validation included
""")
return interface
def check_network_connectivity():
"""Check if the network is working properly by testing connection to common sites"""
test_sites = ["https://www.google.com", "https://www.cloudflare.com", "https://www.amazon.com"]
results = []
for site in test_sites:
try:
response = requests.get(site, timeout=5)
results.append({
"site": site,
"status": "OK" if response.status_code == 200 else f"Error: {response.status_code}",
"response_time": response.elapsed.total_seconds()
})
except Exception as e:
results.append({
"site": site,
"status": f"Error: {str(e)}",
"response_time": None
})
# If all sites failed, there might be a network issue
if all(result["status"].startswith("Error") for result in results):
logger.error("Network connectivity issue detected. All test sites failed.")
return False, results
return True, results
# Add this to the main function
def main():
# Configure system settings
mimetypes.init()
# Check network connectivity
network_ok, network_results = check_network_connectivity()
if not network_ok:
logger.warning("Network connectivity issues detected. Some features may not work properly.")
for result in network_results:
logger.warning(f"Test site {result['site']}: {result['status']}")
# Create and launch interface
interface = create_interface()
# Launch with proper configuration
interface.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
share=False,
inbrowser=True,
debug=True
)
if __name__ == "__main__":
main()