Spaces:
Running
Running
File size: 100,367 Bytes
c70f013 c3b1d58 c70f013 7d538d5 c70f013 8cbfc35 7d538d5 ac0ad33 c70f013 ac0ad33 c70f013 ac0ad33 c70f013 acff31c c70f013 373903f c70f013 acff31c ac0ad33 71b0a3f acff31c 1e07bbc 373903f acff31c 373903f acff31c c70f013 ac0ad33 c70f013 71b0a3f c70f013 8cbfc35 ac0ad33 d139998 acff31c ac0ad33 acff31c c70f013 ac0ad33 373903f 71b0a3f ac0ad33 373903f acff31c ac0ad33 c70f013 ac0ad33 acff31c 373903f c70f013 acff31c d7b800d c70f013 ac0ad33 acff31c 373903f acff31c ac0ad33 373903f ac0ad33 a523c40 acff31c a523c40 acff31c a523c40 acff31c 373903f acff31c d7b800d 4aedfb3 acff31c a523c40 4aedfb3 c70f013 acff31c ac0ad33 9814d50 ac0ad33 acff31c c70f013 ac0ad33 373903f 71b0a3f 373903f c70f013 71b0a3f acff31c b46488c acff31c ac0ad33 acff31c ac0ad33 acff31c ac0ad33 acff31c c70f013 acff31c ac0ad33 acff31c c70f013 d7b800d acff31c ac0ad33 acff31c ac0ad33 acff31c 373903f acff31c c70f013 ac0ad33 373903f ac0ad33 c70f013 acff31c ac0ad33 acff31c c70f013 acff31c c70f013 d7b800d ac0ad33 acff31c c70f013 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c a523c40 c70f013 373903f acff31c 373903f acff31c 373903f acff31c d7b800d ac0ad33 acff31c c70f013 a523c40 acff31c a523c40 acff31c c70f013 acff31c a523c40 acff31c a523c40 c70f013 373903f a523c40 c70f013 acff31c 373903f acff31c a523c40 acff31c c70f013 373903f acff31c 373903f acff31c ac0ad33 71b0a3f a523c40 373903f 71b0a3f 373903f a523c40 c70f013 373903f c70f013 ac0ad33 acff31c 373903f acff31c 373903f a523c40 505b1a3 a523c40 ac0ad33 a523c40 acff31c a523c40 acff31c a523c40 acff31c a523c40 acff31c a523c40 acff31c a523c40 acff31c a523c40 acff31c a523c40 acff31c a523c40 373903f ac0ad33 a523c40 505b1a3 6553acb acff31c 505b1a3 a523c40 acff31c 373903f acff31c a523c40 373903f 505b1a3 373903f 505b1a3 373903f acff31c 505b1a3 acff31c 373903f acff31c a523c40 505b1a3 acff31c ac0ad33 acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f acff31c 373903f a523c40 505b1a3 373903f a523c40 373903f a523c40 ac0ad33 a3d3ab4 a523c40 a3d3ab4 a523c40 71b0a3f a523c40 71b0a3f a523c40 71b0a3f a523c40 ac0ad33 a523c40 71b0a3f a523c40 71b0a3f ac0ad33 a523c40 ac0ad33 a523c40 71b0a3f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 ac0ad33 373903f a523c40 ac0ad33 a523c40 373903f a523c40 373903f a523c40 71b0a3f ac0ad33 a523c40 ac0ad33 c70f013 a523c40 71b0a3f a523c40 ac0ad33 c70f013 ac0ad33 a523c40 ac0ad33 71b0a3f a523c40 71b0a3f a523c40 ac0ad33 71b0a3f 373903f a523c40 ac0ad33 71b0a3f a523c40 ac0ad33 71b0a3f a523c40 71b0a3f a523c40 71b0a3f ac0ad33 a523c40 71b0a3f ac0ad33 a523c40 373903f a523c40 c70f013 a523c40 ac0ad33 c70f013 a523c40 c70f013 a523c40 373903f a523c40 ac0ad33 a523c40 ac0ad33 a523c40 ac0ad33 a523c40 373903f a523c40 c70f013 a523c40 373903f a523c40 373903f a523c40 373903f a523c40 ac0ad33 a523c40 ac0ad33 a523c40 ac0ad33 a523c40 c70f013 a523c40 c70f013 a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 28bbe66 a523c40 373903f 28bbe66 b46488c a523c40 b46488c 28bbe66 a523c40 373903f a523c40 b46488c a523c40 b46488c a523c40 28bbe66 a523c40 373903f a523c40 b46488c 373903f 28bbe66 373903f a523c40 373903f b46488c a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 28bbe66 a523c40 373903f a523c40 b46488c a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 b46488c a523c40 373903f a523c40 373903f b46488c 373903f a523c40 373903f b46488c a523c40 373903f a523c40 b46488c 28bbe66 a523c40 28bbe66 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f 28bbe66 a523c40 373903f a523c40 373903f a523c40 5f1abf4 ac0ad33 62b5581 373903f c70f013 ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f ac0ad33 373903f a523c40 ac0ad33 373903f ac0ad33 a523c40 373903f ac0ad33 373903f ac0ad33 a523c40 ac0ad33 373903f c70f013 ac0ad33 373903f ac0ad33 373903f ac0ad33 a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f a523c40 373903f c70f013 a523c40 c70f013 ac0ad33 a523c40 c70f013 a523c40 ac0ad33 62b5581 ac0ad33 373903f a523c40 373903f ac0ad33 a523c40 ac0ad33 373903f 17fdb3b a523c40 c70f013 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 |
import json
import os
import re
import time
import logging
import mimetypes
import zipfile
import tempfile
import chardet
from datetime import datetime
from typing import List, Dict, Optional, Union, Tuple
from pathlib import Path
from urllib.parse import urlparse, urljoin
import requests
import validators
import gradio as gr
# from diskcache import Cache # Diskcache not used in the provided code, commented out
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from cleantext import clean # Keep import, even if direct use is commented, in case of future use or indirect dependency needs
import qrcode
import qrcode.constants # Explicit import for constants
import qrcode.exceptions # Explicit import for exceptions
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import tarfile
import gzip
import bz2 # Ensure bz2 is imported for .bz2 file handling
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.colors import to_rgba
import io
import math
import cv2 # Imported for QR code detection
try:
import PyPDF2 # Added for PDF processing
from PyPDF2.errors import PdfReadError, DependencyError # Specific PDF errors
PDF_ENABLED = True
except ImportError:
PDF_ENABLED = False
# Define dummy classes/exceptions if PyPDF2 is not installed
class PdfReadError(Exception): pass
class DependencyError(Exception): pass
print("--------------------------------------------------------------------")
print("WARNING: PyPDF2 not installed. PDF processing will be disabled.")
print(" Install with: pip install pypdf2[crypto]")
print("--------------------------------------------------------------------")
# Setup enhanced logging with more detailed formatting
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', encoding='utf-8')
])
logger = logging.getLogger(__name__)
# Ensure output directories exist with modern structure
OUTPUTS_DIR = Path('output')
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes'
TEMP_DIR = OUTPUTS_DIR / 'temp'
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]:
directory.mkdir(parents=True, exist_ok=True)
# --- Constants ---
# Reduced max size for QR payload to ensure fit within Version 40 with H ECC
# QR Code V40-H capacity (binary bytes): 1273
# JSON overhead + chunk metadata needs buffer. 1000 bytes is safer.
QR_PAYLOAD_MAX_BYTES = 1000
DEFAULT_MAX_FILE_SIZE_BYTES = 5 * 1024 * 1024 * 1024 # 5 GB
class EnhancedURLProcessor:
"""Advanced URL processing with content extraction and basic validation."""
def __init__(self):
self.session = requests.Session()
self.timeout = 15 # Extended timeout for larger content
self.max_retries = 3
try:
self.user_agent = UserAgent()
except Exception: # Handle potential errors fetching user agents list
logger.warning("Failed to initialize UserAgent. Using default.")
self.user_agent = None # Fallback
# Enhanced headers for better site compatibility
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'DNT': '1' # Do Not Track
}
if self.user_agent:
headers['User-Agent'] = self.user_agent.random
else:
headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' # Default fallback
self.session.headers.update(headers)
def validate_url(self, url: str) -> Dict:
"""Enhanced URL validation with detailed feedback using HEAD and GET."""
try:
if not validators.url(url):
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'}
parsed = urlparse(url)
if not all([parsed.scheme, parsed.netloc]):
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'}
details = {}
is_valid = False
validation_message = "Validation failed"
# Update User-Agent for this specific request
current_ua = self.user_agent.random if self.user_agent else self.session.headers['User-Agent']
req_headers = {'User-Agent': current_ua}
# Try HEAD request first
try:
head_response = self.session.head(url, timeout=5, allow_redirects=True, headers=req_headers)
head_response.raise_for_status()
details = {
'method': 'HEAD',
'content_type': head_response.headers.get('Content-Type', 'unknown'),
'server': head_response.headers.get('Server', 'unknown'),
'size': head_response.headers.get('Content-Length', 'unknown'),
'final_url': head_response.url
}
is_valid = True
validation_message = 'URL is valid and accessible (HEAD)'
logger.info(f"HEAD request successful for {url}")
except requests.exceptions.RequestException as head_err:
logger.warning(f"HEAD request failed for {url}: {head_err}. Trying GET.")
# If HEAD fails, try GET (stream=True to avoid downloading full content)
try:
get_response = self.session.get(url, timeout=self.timeout, stream=True, allow_redirects=True, headers=req_headers)
get_response.raise_for_status()
details = {
'method': 'GET',
'content_type': get_response.headers.get('Content-Type', 'unknown'),
'server': get_response.headers.get('Server', 'unknown'),
'size': get_response.headers.get('Content-Length', 'unknown'),
'final_url': get_response.url
}
get_response.close() # Close the stream immediately
is_valid = True
validation_message = 'URL is valid and accessible (GET)'
logger.info(f"GET request validation successful for {url}")
except requests.exceptions.RequestException as get_err:
logger.error(f"Both HEAD and GET requests failed for URL validation: {url}. Last error: {get_err}")
validation_message = f'URL validation failed (HEAD/GET): {get_err}'
details = {'error': str(get_err)}
return {
'is_valid': is_valid,
'message': validation_message,
'details': details
}
except Exception as e:
logger.error(f"Unexpected error during URL validation for {url}: {e}", exc_info=True)
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)}
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]:
"""Fetches URL content with retries, handles various types (HTML, Text, JSON, PDF)."""
try:
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})")
current_ua = self.user_agent.random if self.user_agent else self.session.headers['User-Agent']
req_headers = {'User-Agent': current_ua}
response = self.session.get(url, timeout=self.timeout, allow_redirects=True, headers=req_headers)
response.raise_for_status()
# --- Encoding Detection ---
detected_encoding = chardet.detect(response.content)['encoding']
encoding = detected_encoding or response.apparent_encoding or 'utf-8'
logger.debug(f"Encoding for {url}: Detected={detected_encoding}, Apparent={response.apparent_encoding}, Using={encoding}")
# --- Decode Content ---
raw_content_str = None
try:
raw_content_str = response.content.decode(encoding, errors='replace')
except (UnicodeDecodeError, LookupError) as decode_err:
logger.warning(f"Decoding {url} with {encoding} failed ({decode_err}), falling back to utf-8")
encoding = 'utf-8'
raw_content_str = response.content.decode(encoding, errors='replace')
# --- Metadata ---
metadata = {
'url': url,
'timestamp': datetime.now().isoformat(),
'encoding_detected': detected_encoding,
'encoding_used': encoding,
'content_type': response.headers.get('Content-Type', ''),
'content_length_bytes': len(response.content),
'headers': dict(response.headers),
'status_code': response.status_code,
'final_url': response.url
}
# --- Content Processing by Type ---
content_type = metadata['content_type'].lower()
processed_content = None
if 'text/html' in content_type:
processed_content = self._process_html_content(raw_content_str, response.url)
logger.info(f"Fetched and processed HTML from {url}")
elif content_type.startswith('text/'):
processed_content = raw_content_str
logger.info(f"Fetched non-HTML text from {url}")
elif 'application/json' in content_type:
try:
processed_content = json.loads(raw_content_str)
logger.info(f"Fetched and parsed JSON from {url}")
except json.JSONDecodeError:
logger.warning(f"Content type JSON, but failed to parse {url}. Storing raw text.")
processed_content = raw_content_str
elif 'application/pdf' in content_type:
if PDF_ENABLED:
logger.info(f"Detected PDF from URL {url}. Attempting extraction.")
processed_content = self._extract_pdf_text_from_bytes(response.content, url)
else:
logger.warning(f"Detected PDF from URL {url}, but PyPDF2 is disabled. Skipping extraction.")
processed_content = "[PDF Content - PyPDF2 not installed]"
else:
logger.info(f"Fetched non-text/HTML/JSON/PDF content ({content_type}) from {url}. Storing raw string.")
# Limit storage of potentially large binary data represented as string
max_raw_str_len = 10000
if raw_content_str and len(raw_content_str) > max_raw_str_len:
processed_content = raw_content_str[:max_raw_str_len] + f"... [truncated {len(raw_content_str) - max_raw_str_len} chars]"
elif raw_content_str:
processed_content = raw_content_str
else:
processed_content = "[Binary or Undecodable Content]"
return {
'content': processed_content,
'raw_bytes': response.content, # Keep raw bytes if needed for specific handling later
'metadata': metadata
}
except requests.exceptions.Timeout:
logger.error(f"Timeout fetching {url} after {self.timeout}s.")
# Retry logic
if retry_count < self.max_retries - 1:
logger.warning(f"Retrying ({retry_count + 2}/{self.max_retries}) for URL: {url}")
time.sleep(1 * (retry_count + 1)) # Simple linear backoff
return self.fetch_content(url, retry_count + 1)
logger.error(f"Failed to fetch {url} after {self.max_retries} attempts due to timeout.")
return None
except requests.exceptions.RequestException as e:
# Retry logic for general request errors
if retry_count < self.max_retries - 1:
logger.warning(f"Request failed ({e}), Retrying ({retry_count + 2}/{self.max_retries}) for URL: {url}")
time.sleep(1 * (retry_count + 1))
return self.fetch_content(url, retry_count + 1)
logger.error(f"Failed to fetch content from {url} after {self.max_retries} attempts: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching content from {url}: {e}", exc_info=True)
return None
def _process_html_content(self, content: str, base_url: str) -> str:
"""Extracts text from HTML, attempts to absolutize links."""
try:
soup = BeautifulSoup(content, 'lxml') # Use lxml parser
# Absolutize links (best effort)
for tag in soup.find_all(['a', 'img', 'link', 'script'], href=True) + soup.find_all(['img', 'script'], src=True):
attr = 'href' if tag.has_attr('href') else 'src'
if tag[attr]:
try:
# Handle cases where tag[attr] might be a list (rare, but possible)
attr_value = tag[attr]
if isinstance(attr_value, list):
attr_value = attr_value[0] # Take the first one
if not isinstance(attr_value, str): continue # Skip if not a string
base_tag = soup.find('base')
current_base = base_tag['href'] if base_tag and base_tag.get('href') else base_url
abs_url = urljoin(current_base, attr_value)
if validators.url(abs_url): # Check if the result is a valid URL
tag[attr] = abs_url
except Exception as url_e:
logger.debug(f"Ignoring error during URL absolutization: {url_e}")
pass # Ignore errors in URL joining/validation
# Extract text content, joined by newlines
text_content = '\n'.join(soup.stripped_strings)
# Optional cleaning (use cautiously)
# text_content = clean(text_content, no_line_breaks=False, lower=False)
return text_content
except Exception as e:
logger.error(f"HTML processing error for base URL {base_url}: {e}", exc_info=True)
return content # Return original content on error
def _extract_pdf_text_from_bytes(self, pdf_bytes: bytes, source_desc: str) -> str:
"""Helper to extract text from PDF bytes using PyPDF2."""
if not PDF_ENABLED: return "[PDF Extraction Disabled]"
try:
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes))
if pdf_reader.is_encrypted:
# Try decrypting with an empty password
try:
decrypt_status = pdf_reader.decrypt('')
# Check PyPDF2 version for return type (integer in older, enum in newer)
is_decrypted = False
if isinstance(decrypt_status, int): # Older PyPDF2 versions
is_decrypted = decrypt_status > 0
elif hasattr(PyPDF2.PasswordType, 'OWNER_PASSWORD'): # Newer PyPDF2 versions (check attribute exists)
is_decrypted = decrypt_status in (PyPDF2.PasswordType.OWNER_PASSWORD, PyPDF2.PasswordType.USER_PASSWORD)
else: # Fallback check if PasswordType structure changes
logger.warning("Could not determine PyPDF2 decryption status type, assuming success if no error.")
is_decrypted = True # Assume success if decrypt didn't raise error
if is_decrypted:
logger.info(f"Successfully decrypted PDF from {source_desc} with empty password.")
else: # Decryption failed
logger.warning(f"PDF from {source_desc} is encrypted and could not be decrypted with empty password.")
return "[Encrypted PDF Content - Decryption Failed]"
except (NotImplementedError, DependencyError) as decrypt_err:
# PyPDF2 might need cryptography for some PDFs
logger.error(f"Decryption dependency error for PDF from {source_desc}: {decrypt_err}. Ensure 'cryptography' is installed (`pip install pypdf2[crypto]`).")
return f"[Encrypted PDF Content - Decryption Dependency Missing: {decrypt_err}]"
except Exception as decrypt_err:
logger.warning(f"Error during decryption attempt for PDF from {source_desc}: {decrypt_err}")
return "[Encrypted PDF Content - Decryption Error]"
# Proceed with extraction if not encrypted or successfully decrypted
pdf_text_parts = []
num_pages = len(pdf_reader.pages)
for page_num in range(num_pages):
try:
page = pdf_reader.pages[page_num]
extracted = page.extract_text()
pdf_text_parts.append(extracted if extracted else "") # Add empty string if extraction fails
except Exception as page_err:
logger.warning(f"Error extracting text from page {page_num+1}/{num_pages} of PDF from {source_desc}: {page_err}")
pdf_text_parts.append(f"[Error extracting page {page_num+1}]")
full_text = "\n".join(pdf_text_parts).strip()
logger.info(f"Successfully extracted text ({len(full_text)} chars) from {num_pages} pages of PDF from {source_desc}")
return full_text if full_text else "[PDF contains no extractable text]"
except PdfReadError as pdf_err:
logger.error(f"Could not read PDF from {source_desc}: {pdf_err}")
return f"[Invalid or Corrupted PDF: {pdf_err}]"
except Exception as e:
logger.error(f"Unexpected error processing PDF from {source_desc}: {e}", exc_info=True)
return "[Error processing PDF]"
class EnhancedFileProcessor:
"""Processes local files (text, json, pdf, archives) with content extraction."""
def __init__(self, max_file_size: int = DEFAULT_MAX_FILE_SIZE_BYTES):
self.max_file_size = max_file_size
self.text_extensions = {
'.txt', '.md', '.csv', '.xml', '.html', '.htm', '.css', '.js',
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', '.toml', '.sql', '.py', '.java', '.c', '.cpp', '.h'
}
self.json_extension = '.json'
self.pdf_extension = '.pdf'
self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.tgz', '.tar.gz', '.tar.bz2'}
def process_file(self, file) -> List[Dict]:
"""Processes a single uploaded Gradio file object."""
if not file or not hasattr(file, 'name'):
logger.warning("Invalid file object received.")
return []
dataset = []
# Use orig_name if available (better for Gradio temp files)
file_display_name = getattr(file, 'orig_name', getattr(file, 'name', 'Unknown File'))
file_path_obj = Path(file.name) if hasattr(file, 'name') else None
if not file_path_obj:
logger.error("File object missing 'name' attribute.")
return [{'error': 'Invalid file object on server', 'filename': file_display_name}]
try:
# Gradio file objects might be temporary, ensure path exists
if not file_path_obj.exists():
logger.error(f"Temporary file path does not exist: {file_path_obj}")
return [{'error': 'File not found on server', 'filename': file_display_name}]
file_path = file_path_obj.resolve() # Get absolute path
file_size = file_path.stat().st_size
if file_size > self.max_file_size:
logger.warning(f"File '{file_display_name}' size ({file_size} bytes) exceeds limit ({self.max_file_size} bytes).")
return [{'error': 'File too large', 'filename': file_display_name, 'file_size': file_size}]
if file_size == 0:
logger.warning(f"File '{file_display_name}' is empty.")
return [{'error': 'File is empty', 'filename': file_display_name, 'file_size': 0}]
# Use a temporary directory for extraction if needed
with tempfile.TemporaryDirectory(dir=TEMP_DIR, prefix="extract_") as temp_dir:
temp_dir_path = Path(temp_dir)
if self._is_archive(str(file_path)):
logger.info(f"Processing archive: {file_display_name}")
dataset.extend(self._process_archive(str(file_path), temp_dir_path, archive_display_name=file_display_name))
else:
logger.info(f"Processing single file: {file_display_name}")
dataset.extend(self._process_single_file(file_path, file_display_name))
except Exception as e:
logger.error(f"Error processing file '{file_display_name}': {e}", exc_info=True)
dataset.append({'error': f'Processing failed: {e}', 'filename': file_display_name})
return dataset
def _is_archive(self, filepath: str) -> bool:
"""Checks if the file extension is a supported archive type."""
# Check suffix and double suffixes like .tar.gz
p = Path(filepath)
suffix = p.suffix.lower()
double_suffix = "".join(p.suffixes[-2:]).lower()
return suffix in self.archive_extensions or double_suffix in self.archive_extensions
def _process_single_file(self, file_path: Path, display_name: str) -> List[Dict]:
"""Processes a single non-archive file (text, json, pdf, binary)."""
file_name = display_name # Use the potentially original name for reporting
file_suffix = file_path.suffix.lower()
try:
file_stat = file_path.stat()
file_size = file_stat.st_size
mime_type, _ = mimetypes.guess_type(file_path)
mime_type = mime_type or 'application/octet-stream'
complete_content = None
raw_content_str = None # Store raw string if read as text
source = 'unknown_file'
# --- Determine File Type and Process ---
# 1. PDF
if PDF_ENABLED and (file_suffix == self.pdf_extension or mime_type == 'application/pdf'):
source = 'pdf_file'
logger.info(f"Processing PDF file: {file_name}")
with open(file_path, 'rb') as f:
pdf_bytes = f.read()
# Use the same helper as URL processor
url_processor = EnhancedURLProcessor() # Create temporary instance
complete_content = url_processor._extract_pdf_text_from_bytes(pdf_bytes, f"file '{file_name}'")
# 2. JSON
elif file_suffix == self.json_extension or mime_type == 'application/json':
source = 'json_file'
logger.info(f"Processing JSON file: {file_name}")
try:
# Read with UTF-8 first, fallback if needed
try:
with open(file_path, 'r', encoding='utf-8') as f:
raw_content_str = f.read()
except UnicodeDecodeError:
logger.warning(f"UTF-8 decode failed for {file_name}, trying detected encoding.")
with open(file_path, 'rb') as fb:
raw_bytes = fb.read()
detected_encoding = chardet.detect(raw_bytes)['encoding'] or 'latin-1' # Fallback to latin-1
logger.info(f"Detected encoding for {file_name}: {detected_encoding}")
raw_content_str = raw_bytes.decode(detected_encoding, errors='replace')
complete_content = json.loads(raw_content_str) # Parse JSON
logger.info(f"Successfully parsed JSON from {file_name}")
except json.JSONDecodeError as json_err:
logger.warning(f"File {file_name} looks like JSON but failed to parse: {json_err}. Storing raw text.")
complete_content = raw_content_str # Store raw text
source = 'text_file_failed_json'
except IOError as e:
logger.error(f"IOError reading JSON file {file_name}: {e}")
return [{'error': f'IOError reading file: {e}', 'filename': file_name}]
except Exception as e:
logger.error(f"Error reading/parsing JSON file {file_name}: {e}", exc_info=True)
complete_content = f"[Error reading JSON file: {e}]"
raw_content_str = complete_content
# 3. Text
elif file_suffix in self.text_extensions or (mime_type and mime_type.startswith('text/')):
source = 'text_file'
logger.info(f"Processing Text file: {file_name}")
try:
with open(file_path, 'rb') as f:
raw_bytes = f.read()
detected_encoding = chardet.detect(raw_bytes)['encoding'] or 'utf-8'
logger.info(f"Detected encoding for {file_name}: {detected_encoding}")
raw_content_str = raw_bytes.decode(detected_encoding, errors='replace')
complete_content = raw_content_str
logger.info(f"Successfully read text from {file_name}")
# Optional: Try parsing as JSON
try:
# Basic check before attempting full parse
if raw_content_str.strip().startswith(("{", "[")) and raw_content_str.strip().endswith(("}", "]")):
json_data = json.loads(raw_content_str)
complete_content = json_data
source = 'json_content_detected_in_text'
logger.info(f"Detected and parsed JSON structure within text file {file_name}")
except json.JSONDecodeError:
pass # It's just text
except IOError as e:
logger.error(f"IOError reading text file {file_name}: {e}")
return [{'error': f'IOError reading file: {e}', 'filename': file_name}]
except Exception as e:
logger.error(f"Error reading text file {file_name}: {e}", exc_info=True)
complete_content = f"[Error reading text file: {e}]"
raw_content_str = complete_content
# 4. Other (Binary/Unsupported)
else:
source = 'binary_or_unsupported_file'
logger.warning(f"File {file_name} not processed as text/json/pdf (MIME: {mime_type}, Suffix: {file_suffix}). Treating as binary/other.")
complete_content = f"[Binary or unsupported content ({file_size} bytes)]"
# --- Structure Output ---
result = {
'source': source,
'filename': file_name, # Use display name
'file_size': file_size,
'mime_type': mime_type,
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'content': complete_content,
'timestamp': datetime.now().isoformat()
}
# Include raw string only if it's text and wasn't successfully parsed as JSON
if raw_content_str is not None and source in ['text_file', 'text_file_failed_json', 'json_content_detected_in_text']:
result['raw_content'] = raw_content_str
return [result]
except FileNotFoundError:
logger.error(f"File not found during processing: {file_path} (Display Name: {display_name})")
return [{'error': 'File not found', 'filename': display_name}]
except Exception as e:
logger.error(f"General file processing error for {display_name}: {e}", exc_info=True)
return [{'error': f'File processing failed: {e}', 'filename': display_name}]
def _process_archive(self, archive_path_str: str, extract_to: Path, archive_display_name: str) -> List[Dict]:
"""Extracts files from supported archives and processes them."""
dataset = []
archive_path = Path(archive_path_str)
archive_name = archive_display_name # Use display name for reporting
logger.info(f"Attempting to extract archive: {archive_name}")
extracted_something = False
try:
# --- ZIP ---
if archive_name.lower().endswith('.zip') and zipfile.is_zipfile(archive_path):
logger.debug(f"Processing ZIP: {archive_name}")
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
# Basic zip bomb check (total size)
total_uncompressed = sum(f.file_size for f in zip_ref.infolist())
# Allow larger uncompressed size, but cap relative to compressed size
max_allowed_uncompressed = max(self.max_file_size, 20 * archive_path.stat().st_size)
if total_uncompressed > max_allowed_uncompressed:
logger.warning(f"ZIP {archive_name} potentially too large (uncompressed: {total_uncompressed} vs limit {max_allowed_uncompressed}). Skipping.")
return [{'error': 'Archive potential bomb or excessively large', 'filename': archive_name}]
for member in zip_ref.infolist():
member_display_name = member.filename # Name inside archive
if member.is_dir() or member.file_size == 0: continue # Skip directories and empty files
if member.file_size > self.max_file_size:
logger.warning(f"Skipping '{member_display_name}' in {archive_name}: size {member.file_size} exceeds limit.")
dataset.append({'error': 'File in archive too large', 'filename': member_display_name, 'archive': archive_name})
continue
# Sanitize and prevent path traversal - ensure target is within extract_to
member_path = Path(member_display_name)
# Remove leading slashes and '..' components
safe_parts = [p for p in member_path.parts if p not in ('', '.', '..')]
if not safe_parts: continue # Skip if path becomes empty
target_path = extract_to.joinpath(*safe_parts).resolve()
# Double check it's within the extraction directory
if not str(target_path).startswith(str(extract_to.resolve())):
logger.warning(f"Skipping potentially unsafe path '{member_display_name}' in {archive_name}")
continue
target_path.parent.mkdir(parents=True, exist_ok=True)
try:
with zip_ref.open(member) as source, open(target_path, "wb") as target:
size_written = 0
while True:
chunk = source.read(8192)
if not chunk: break
size_written += len(chunk)
if size_written > self.max_file_size: # Check during extraction
raise OverflowError(f"File '{member_display_name}' exceeded size limit during extraction.")
target.write(chunk)
logger.debug(f"Extracted '{member_display_name}' to '{target_path.relative_to(extract_to)}' from {archive_name}")
# Process the extracted file, passing its name within the archive
results = self._process_single_file(target_path, member_display_name)
# Add archive context to results
for res in results: res['archive'] = archive_name
dataset.extend(results)
extracted_something = True
except OverflowError as oe:
logger.error(f"Error extracting {member_display_name} from {archive_name}: {oe}")
dataset.append({'error': str(oe), 'filename': member_display_name, 'archive': archive_name})
if target_path.exists(): target_path.unlink() # Clean up partial file
except Exception as extract_err:
logger.error(f"Failed to extract/process {member_display_name} from {archive_name}: {extract_err}", exc_info=True)
dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member_display_name, 'archive': archive_name})
# --- TAR (tar, tar.gz, tgz, tar.bz2) ---
elif tarfile.is_tarfile(archive_path):
logger.debug(f"Processing TAR: {archive_name}")
# Mode 'r:*' auto-detects compression
with tarfile.open(archive_path, 'r:*') as tar_ref:
for member in tar_ref.getmembers():
member_display_name = member.name # Name inside archive
if not member.isfile() or member.size == 0: continue # Skip non-files and empty files
if member.size > self.max_file_size:
logger.warning(f"Skipping '{member_display_name}' in {archive_name}: size {member.size} exceeds limit.")
dataset.append({'error': 'File in archive too large', 'filename': member_display_name, 'archive': archive_name})
continue
# Sanitize and prevent path traversal
member_path = Path(member_display_name)
safe_parts = [p for p in member_path.parts if p not in ('', '.', '..')]
if not safe_parts: continue
target_path = extract_to.joinpath(*safe_parts).resolve()
if not str(target_path).startswith(str(extract_to.resolve())):
logger.warning(f"Skipping potentially unsafe path '{member_display_name}' in {archive_name}")
continue
target_path.parent.mkdir(parents=True, exist_ok=True)
try:
fileobj = tar_ref.extractfile(member)
if fileobj:
with open(target_path, "wb") as target:
size_written = 0
while True:
chunk = fileobj.read(8192)
if not chunk: break
size_written += len(chunk)
if size_written > self.max_file_size: # Check during extraction
raise OverflowError(f"File '{member_display_name}' exceeded size limit during extraction.")
target.write(chunk)
logger.debug(f"Extracted '{member_display_name}' to '{target_path.relative_to(extract_to)}' from {archive_name}")
# Process the extracted file
results = self._process_single_file(target_path, member_display_name)
for res in results: res['archive'] = archive_name
dataset.extend(results)
extracted_something = True
else:
logger.warning(f"Could not extract file object for '{member_display_name}' from {archive_name}")
except OverflowError as oe:
logger.error(f"Error extracting {member_display_name} from {archive_name}: {oe}")
dataset.append({'error': str(oe), 'filename': member_display_name, 'archive': archive_name})
if target_path.exists(): target_path.unlink()
except Exception as extract_err:
logger.error(f"Failed to extract/process {member_display_name} from {archive_name}: {extract_err}", exc_info=True)
dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member_display_name, 'archive': archive_name})
# --- GZIP (single file) ---
elif archive_name.lower().endswith('.gz') and not archive_name.lower().endswith('.tar.gz'): # Avoid double handling tar.gz
logger.debug(f"Processing GZIP: {archive_name}")
# Determine output filename (remove .gz suffix)
extracted_filename = Path(archive_name).stem
if not extracted_filename: extracted_filename = archive_name + "_extracted" # Handle cases like '.gz'
target_path = extract_to / extracted_filename
target_path.parent.mkdir(parents=True, exist_ok=True)
try:
with gzip.open(archive_path, 'rb') as gz_file, open(target_path, 'wb') as outfile:
size_written = 0
while True:
chunk = gz_file.read(8192)
if not chunk: break
size_written += len(chunk)
if size_written > self.max_file_size:
raise OverflowError(f"Gzipped file '{archive_name}' exceeded size limit during decompression.")
outfile.write(chunk)
logger.debug(f"Extracted '{target_path.name}' from {archive_name}")
# Process the extracted file
results = self._process_single_file(target_path, extracted_filename) # Use extracted name
for res in results: res['archive'] = archive_name
dataset.extend(results)
extracted_something = True
except OverflowError as oe:
logger.error(f"Error extracting {archive_name}: {oe}")
dataset.append({'error': str(oe), 'filename': extracted_filename, 'archive': archive_name})
if target_path.exists(): target_path.unlink()
except gzip.BadGzipFile as e:
logger.error(f"Error processing GZIP {archive_name}: Bad Gzip File - {e}")
dataset.append({'error': f'Bad Gzip File: {e}', 'filename': archive_name})
except Exception as extract_err:
logger.error(f"Failed to extract/process gzip {archive_name}: {extract_err}", exc_info=True)
dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': archive_name})
# --- BZ2 (single file) ---
elif archive_name.lower().endswith('.bz2') and not archive_name.lower().endswith('.tar.bz2'): # Avoid double handling tar.bz2
logger.debug(f"Processing BZ2: {archive_name}")
extracted_filename = Path(archive_name).stem
if not extracted_filename: extracted_filename = archive_name + "_extracted"
target_path = extract_to / extracted_filename
target_path.parent.mkdir(parents=True, exist_ok=True)
try:
with bz2.open(archive_path, 'rb') as bz2_file, open(target_path, 'wb') as outfile:
size_written = 0
while True:
chunk = bz2_file.read(8192)
if not chunk: break
size_written += len(chunk)
if size_written > self.max_file_size:
raise OverflowError(f"Bzipped file '{archive_name}' exceeded size limit during decompression.")
outfile.write(chunk)
logger.debug(f"Extracted '{target_path.name}' from {archive_name}")
# Process the extracted file
results = self._process_single_file(target_path, extracted_filename)
for res in results: res['archive'] = archive_name
dataset.extend(results)
extracted_something = True
except OverflowError as oe:
logger.error(f"Error extracting {archive_name}: {oe}")
dataset.append({'error': str(oe), 'filename': extracted_filename, 'archive': archive_name})
if target_path.exists(): target_path.unlink()
except Exception as extract_err:
logger.error(f"Failed to extract/process bz2 {archive_name}: {extract_err}", exc_info=True)
dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': archive_name})
else:
# If it wasn't handled by tarfile (e.g., .tar.gz, .tar.bz2) or the single file handlers, log warning.
if not tarfile.is_tarfile(archive_path):
logger.warning(f"Archive type not recognized or handled: {archive_name}")
dataset.append({'error': 'Unrecognized archive type', 'filename': archive_name})
# If it *was* a tarfile but didn't extract anything (e.g., empty or only dirs)
elif not extracted_something and not any('error' in d for d in dataset):
logger.warning(f"Archive {archive_name} processed, but no valid files were extracted or processed.")
# Optionally add a note to the dataset
# dataset.append({'warning': 'No processable files found in archive', 'filename': archive_name})
except FileNotFoundError:
logger.error(f"Archive file not found: {archive_path}")
dataset.append({'error': 'Archive file not found', 'filename': archive_name})
except (zipfile.BadZipFile, tarfile.TarError, gzip.BadGzipFile, EOFError) as archive_err: # Added EOFError for tar issues
logger.error(f"Invalid or corrupted archive file {archive_name}: {archive_err}")
dataset.append({'error': f'Corrupted or invalid archive: {archive_err}', 'filename': archive_name})
except Exception as e:
logger.error(f"General archive processing error for {archive_name}: {e}", exc_info=True)
dataset.append({'error': f'Archive processing failed: {e}', 'filename': archive_name})
return dataset
# Adjusted chunk_data with recommended max_size for QR codes
def chunk_data(self, data: Union[Dict, List, str], max_size: int = QR_PAYLOAD_MAX_BYTES) -> List[Dict]:
"""Enhanced data chunking with sequence metadata, sized for QR codes."""
try:
if not isinstance(data, str):
# Convert complex data to JSON string first
# Use separators=(',', ':') for compact JSON
json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
else:
json_str = data # Assume input string is already the data payload
# Data here is the raw string (or JSON string) payload for the QR code
total_length = len(json_str.encode('utf-8')) # Use byte length for QR capacity
logger.debug(f"Chunking data of total byte length: {total_length}")
# Simplified: If the data fits within max_size (bytes), return one chunk object
# The chunk object itself adds metadata, but the 'data' field is what matters for QR limit.
if total_length <= max_size:
chunk_meta = {
"chunk_index": 0,
"total_chunks": 1,
"total_length": total_length, # Store byte length
"chunk_hash": hash(json_str) & 0xFFFFFFFF,
"data": json_str # The actual string payload
}
logger.debug(f"Data fits in one chunk (payload size {total_length} bytes)")
return [chunk_meta]
# If data exceeds max_size, split the string payload
# We need to split the *string* representation carefully
# Aim for byte size chunks, which is tricky with UTF-8 variable char width
# Simple approach: estimate character chunk size based on bytes
# Estimate average bytes per character (crude but simple)
avg_bytes_per_char = total_length / len(json_str) if len(json_str) > 0 else 1
# Calculate target character chunk size based on byte limit
target_char_chunk_size = int(max_size / avg_bytes_per_char)
if target_char_chunk_size < 1: target_char_chunk_size = 1 # Avoid zero chunk size
# Calculate number of chunks based on estimated character size
num_chunks = math.ceil(len(json_str) / target_char_chunk_size)
chunks = []
start_char_idx = 0
for i in range(num_chunks):
# Calculate end index, ensuring we don't overshoot
end_char_idx = min(start_char_idx + target_char_chunk_size, len(json_str))
# Extract the character chunk
chunk_payload_str = json_str[start_char_idx:end_char_idx]
# Recalculate actual byte length for this specific chunk
current_chunk_byte_length = len(chunk_payload_str.encode('utf-8'))
# Adjust end_char_idx if current chunk exceeds max_size (rare if estimate is decent)
# This loop ensures the byte limit is strictly adhered to
while current_chunk_byte_length > max_size and end_char_idx > start_char_idx:
end_char_idx -= 1 # Reduce characters by 1
# Re-slice the string with the adjusted end index
chunk_payload_str = json_str[start_char_idx:end_char_idx]
current_chunk_byte_length = len(chunk_payload_str.encode('utf-8'))
# Handle edge case where adjustment makes chunk empty (shouldn't happen if max_size > 1)
if not chunk_payload_str and start_char_idx < len(json_str):
logger.error(f"Chunking resulted in empty payload string unexpectedly at index {i}. Max size: {max_size}, Start char: {start_char_idx}.")
# Option: break, or try to take at least one char if possible
if end_char_idx == start_char_idx and start_char_idx < len(json_str):
end_char_idx += 1 # Force at least one char
chunk_payload_str = json_str[start_char_idx:end_char_idx]
current_chunk_byte_length = len(chunk_payload_str.encode('utf-8'))
if current_chunk_byte_length > max_size:
logger.error(f"Cannot create chunk even with 1 char without exceeding max_size ({max_size} bytes). Aborting chunking.")
return [] # Cannot proceed
else:
break # Avoid potential infinite loop
chunk_meta = {
"chunk_index": i,
"total_chunks": num_chunks, # Initial estimate, may be adjusted later
"total_length": total_length, # Original total byte length
"chunk_byte_length": current_chunk_byte_length, # Actual byte length of this chunk's payload
"chunk_hash": hash(chunk_payload_str) & 0xFFFFFFFF,
"data": chunk_payload_str # The string payload for this chunk
}
chunks.append(chunk_meta)
logger.debug(f"Created chunk {i+1}/{num_chunks}, payload byte size: {current_chunk_byte_length}")
# Move to the next starting point
start_char_idx = end_char_idx
# Safety break if start index doesn't advance
if start_char_idx == len(json_str) and i + 1 < num_chunks:
logger.warning(f"Chunking finished early at index {i+1} of {num_chunks}. Adjusting total chunks.")
# Adjust total_chunks in already created chunks
final_num_chunks = len(chunks)
for ch_idx, ch in enumerate(chunks):
ch['total_chunks'] = final_num_chunks
ch['chunk_index'] = ch_idx # Re-index just in case
num_chunks = final_num_chunks # Update num_chunks for loop condition/logging
break
# Final check if total chunks changed due to adjustments or early finish
if chunks and chunks[-1]['total_chunks'] != len(chunks):
logger.warning(f"Adjusting total_chunks from {chunks[-1]['total_chunks']} to {len(chunks)} after loop completion.")
final_num_chunks = len(chunks)
for i, chunk in enumerate(chunks):
chunk['total_chunks'] = final_num_chunks
chunk['chunk_index'] = i # Re-index just in case
return chunks
except Exception as e:
logger.error(f"Error chunking data: {e}", exc_info=True)
return []
def generate_stylish_qr(data: str, # Expecting string data from chunking
filename: str,
size: int = 10,
border: int = 4,
fill_color: str = "#000000",
back_color: str = "#FFFFFF",
error_correction_level=qrcode.constants.ERROR_CORRECT_H) -> str: # Added param
"""Generate a stylish QR code with enhanced visual appeal"""
try:
qr = qrcode.QRCode(
version=None, # Auto-detect version
error_correction=error_correction_level, # Use parameter
box_size=size,
border=border
)
# Add string data directly (should be from chunker)
qr.add_data(data)
# Let the library figure out the best version and mode
qr.make(fit=True)
payload_bytes = len(data.encode('utf-8'))
logger.info(f"Generating QR code version {qr.version} for {filename} (Payload size: {payload_bytes} bytes)")
# Check if payload size exceeds capacity for the chosen version/ECC level
# This is a secondary check, as DataOverflowError should catch it, but good for logging
# Note: Capacities vary by mode (Numeric, Alphanumeric, Byte, Kanji)
# We assume Byte mode for JSON strings.
# Example capacity for V40-H (Byte mode): 1273 bytes
# A more robust check would involve getting capacity from the library if possible.
# For now, rely on the DataOverflowError exception.
# Create QR code image with custom colors
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color)
# Convert to RGBA for transparency support (optional gradient)
qr_image = qr_image.convert('RGBA')
# --- Optional: Add subtle gradient overlay ---
# gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0))
# draw = ImageDraw.Draw(gradient)
# for i in range(qr_image.width):
# alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity
# draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha))
# final_image = Image.alpha_composite(qr_image, gradient)
# --- End Optional Gradient ---
final_image = qr_image # Use this line if gradient is commented out
# Save the image
output_path = QR_CODES_DIR / filename
# Ensure directory exists just before saving
output_path.parent.mkdir(parents=True, exist_ok=True)
final_image.save(output_path, quality=95) # PNG quality is lossless, but ok
return str(output_path)
# Catch specific data overflow error
except qrcode.exceptions.DataOverflowError as doe:
payload_bytes = len(data.encode('utf-8'))
logger.error(f"QR DataOverflowError for {filename}: {doe}. Data length (bytes): {payload_bytes}. Max capacity likely exceeded for ErrorLevel {error_correction_level} and auto-detected version {getattr(qr, 'version', 'N/A')}.")
return "" # Return empty string on failure
except Exception as e:
logger.error(f"QR generation error for {filename}: {e}", exc_info=True)
return ""
def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: bool = True) -> List[str]:
"""Generate QR codes, chunking data appropriately."""
try:
file_processor = EnhancedFileProcessor() # Get chunking method
all_qr_paths = []
qr_fill = "#1a365d" # Deep blue
qr_back = "#ffffff"
# Decide on error correction level - H is default, M or L allow more data
error_level = qrcode.constants.ERROR_CORRECT_H # Max correction, lowest capacity
# error_level = qrcode.constants.ERROR_CORRECT_M # Medium correction, medium capacity
# error_level = qrcode.constants.ERROR_CORRECT_L # Low correction, max capacity
if combine_sources:
logger.info("Combining all input sources into a single QR sequence.")
# Combine all data into one large structure (e.g., a list) before chunking
# This assumes `data_to_encode` is already the combined list/dict from process_inputs
if not data_to_encode:
logger.warning("No data provided to generate combined QR codes.")
return []
# Chunk the combined data structure
chunks = file_processor.chunk_data(data_to_encode, max_size=QR_PAYLOAD_MAX_BYTES) # Use constant
if not chunks:
logger.error("Chunking the combined data failed.")
return []
num_chunks = len(chunks)
logger.info(f"Generating {num_chunks} QR codes for combined data.")
for i, chunk_info in enumerate(chunks):
# chunk_info contains {'chunk_index', 'total_chunks', 'data', etc.}
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{num_chunks}.png'
# Pass the actual payload string to the generator
qr_payload = chunk_info['data']
qr_path = generate_stylish_qr(
data=qr_payload,
filename=filename,
fill_color=qr_fill,
back_color=qr_back,
error_correction_level=error_level # Pass level
)
if qr_path:
all_qr_paths.append(qr_path)
else:
logger.error(f"Failed to generate QR code for combined chunk {i+1}")
# Optionally stop or continue? Or add error marker?
else:
# Process each item in the input list individually
logger.info("Generating separate QR code sequences for each input source.")
if not isinstance(data_to_encode, list):
logger.error("Input data must be a list when combine_sources is False.")
# Maybe wrap it?
if data_to_encode:
data_to_encode = [data_to_encode]
else:
return []
total_items = len(data_to_encode)
for item_idx, item in enumerate(data_to_encode):
item_source_info = f"item {item_idx+1}/{total_items}"
# Try to get a better name (e.g., from filename if available)
if isinstance(item, dict):
# Prioritize filename, then url, then source type
if 'filename' in item:
item_source_info = Path(item['filename']).stem # Use stem for cleaner name
elif 'url' in item:
# Clean up URL for filename use
parsed_url = urlparse(item['url'])
url_path_name = Path(parsed_url.path).stem
url_domain = parsed_url.netloc.replace('.', '_')
item_source_info = f"url_{url_domain}_{url_path_name}" if url_path_name else f"url_{url_domain}_index"
elif 'source' in item:
item_source_info = item['source'] + f"_{item_idx+1}"
logger.info(f"Processing source: {item_source_info}")
# Chunk the individual item
chunks = file_processor.chunk_data(item, max_size=QR_PAYLOAD_MAX_BYTES) # Use constant
if not chunks:
logger.error(f"Chunking failed for item {item_idx+1} ({item_source_info})")
continue # Skip to next item
num_chunks = len(chunks)
logger.info(f"Generating {num_chunks} QR codes for {item_source_info}.")
for chunk_idx, chunk_info in enumerate(chunks):
# Sanitize source info for filename
safe_source_name = re.sub(r'[^\w\-\.]+', '_', item_source_info)[:50] # Limit length
timestamp_short = str(int(time.time()))[-6:] # Shorter timestamp
filename = f'{safe_source_name}_chunk_{chunk_idx+1}_of_{num_chunks}_{timestamp_short}.png'
qr_payload = chunk_info['data']
qr_path = generate_stylish_qr(
data=qr_payload,
filename=filename,
fill_color=qr_fill,
back_color=qr_back,
error_correction_level=error_level # Pass level
)
if qr_path:
all_qr_paths.append(qr_path)
else:
logger.error(f"Failed to generate QR code for {item_source_info} chunk {chunk_idx+1}")
logger.info(f"Generated a total of {len(all_qr_paths)} QR codes.")
return all_qr_paths
except Exception as e:
logger.error(f"General QR code generation process error: {e}", exc_info=True)
return []
def _generate_sequence_visualization_image(qr_paths: List[str], qr_data: List[Dict], title: str = "QR Code Sequence") -> Optional[io.BytesIO]:
"""
Generates a visual representation of the QR code sequence using NetworkX and Matplotlib.
Args:
qr_paths: List of file paths to the QR code images.
qr_data: List of decoded data dictionaries, hopefully containing 'chunk_index'.
title: The title for the visualization plot.
Returns:
A BytesIO buffer containing the PNG image of the visualization, or None if error.
"""
if not qr_paths or not qr_data or len(qr_paths) != len(qr_data):
logger.warning("Mismatch or empty data for visualization.")
return None
logger.info(f"Generating visualization for {len(qr_paths)} QR codes.")
try:
G = nx.DiGraph()
node_labels = {}
node_colors = []
node_sizes = []
# Assume data is pre-sorted by chunk_index during loading
num_nodes = len(qr_paths)
total_chunks_from_meta = qr_data[0].get('total_chunks', num_nodes) if qr_data else num_nodes
for i in range(num_nodes):
node_id = i
# Use chunk_index from metadata if possible, otherwise use list index
chunk_idx = qr_data[i].get('chunk_index', i)
label = f"{chunk_idx + 1}/{total_chunks_from_meta}"
# Add indicator if decode failed
if qr_data[i].get('data') == "[DECODE FAILED]":
label += "\n(Decode Err)"
node_labels[node_id] = label
G.add_node(node_id, path=qr_paths[i], data=qr_data[i])
# Add edges between consecutive nodes
if i > 0:
G.add_edge(i - 1, i)
# Simple coloring/sizing (can be customized further)
node_colors.append('#4299e1') # Default blue color
node_sizes.append(1500)
if not G.nodes:
logger.warning("No nodes to visualize.")
return None
# --- Layout and Drawing ---
plt.style.use('seaborn-v0_8-whitegrid') # Use a clean style
plt.figure(figsize=(max(10, num_nodes * 1.5), 6)) # Adjust figure size based on number of nodes, slightly taller
# Simple linear layout for sequences is often clearest
pos = {i: (i * 2, 0) for i in range(num_nodes)} # Horizontal layout
nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors, alpha=0.9, edgecolors='grey')
nx.draw_networkx_edges(G, pos, arrowstyle='-|>', arrowsize=20, edge_color='gray', alpha=0.6, node_size=node_sizes)
nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=9, font_color='white', font_weight='bold')
plt.title(title, fontsize=16, pad=20)
plt.xlabel("Sequence Index", fontsize=12)
plt.yticks([]) # Hide Y-axis ticks for linear layout
plt.xticks(range(0, num_nodes * 2, 2), [f"{i+1}" for i in range(num_nodes)]) # Label X-axis ticks
plt.box(False) # Remove frame box
plt.margins(x=0.1) # Add some horizontal margin
plt.tight_layout()
# Save plot to a BytesIO buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight', dpi=100)
plt.close() # Close the plot figure to free memory
buf.seek(0)
logger.info("Successfully generated visualization image buffer.")
return buf
except Exception as e:
logger.error(f"Error generating visualization image: {e}", exc_info=True)
plt.close() # Ensure plot is closed even on error
return None
# --- Gradio Interface Section ---
def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if needed later
"""Add QR sequence visualization capabilities to the application"""
with gr.Tab("π QR Sequence Visualizer"):
gr.Markdown("""
## QR Code Sequence Visualizer
Upload a sequence of QR codes (e.g., those generated by this app) to decode them and visualize their order.
Uses OpenCV for detection and decoding. Requires `opencv-python-headless`.
""")
# Store data globally within this tab's scope (alternative to Gradio State)
# This is simpler but not ideal for complex state management
shared_data = {'qr_paths': [], 'qr_data': []}
with gr.Row():
with gr.Column(scale=1):
qr_input = gr.File(
label="Upload QR Code Images",
file_types=["image/png", "image/jpeg", ".png", ".jpg", ".jpeg"], # Be explicit
file_count="multiple"
)
visualize_btn = gr.Button("ποΈ Decode & Visualize Sequence", variant="primary")
reset_btn = gr.Button("ποΈ Reset Visualizer", variant="secondary")
# Use a single Textbox for status messages (including indicator)
visualization_status = gr.Textbox(label="Status", interactive=False, lines=3)
with gr.Column(scale=2):
qr_visualization = gr.Image(label="QR Code Sequence Map", type="pil", height=400) # Use PIL type
qr_preview = gr.Gallery(label="Uploaded QR Codes (Sorted)", columns=4, height=400, object_fit="contain", preview=True)
def process_qr_codes_and_visualize(files):
"""Decodes QR files, sorts them, updates gallery, and generates visualization."""
if not files:
shared_data['qr_paths'] = []
shared_data['qr_data'] = []
return None, None, "β οΈ Please upload QR code images." # Return None for gallery/image, text for status
logger.info(f"Processing {len(files)} uploaded QR files for visualization.")
qr_data_list = []
qr_path_list = []
decode_errors = 0
# Use OpenCV detector
try:
detector = cv2.QRCodeDetector()
except Exception as init_e:
logger.error(f"Error initializing OpenCV QR detector: {init_e}")
return None, None, f"β Error initializing QR detector: {init_e}"
for file in files:
file_display_name = getattr(file, 'orig_name', getattr(file, 'name', 'N/A'))
try:
img_path = file.name # Gradio File object path
# Read image using OpenCV
img_cv = cv2.imread(img_path)
if img_cv is None:
logger.warning(f"Could not read image file: {file_display_name}")
decode_errors += 1
# Add placeholder for gallery consistency?
qr_data_list.append({"data": "[READ FAILED]", "chunk_index": -1, "filename": file_display_name})
qr_path_list.append(img_path) # Still need path for gallery
continue
# Try to detect and decode QR code
data, bbox, straight_qrcode = detector.detectAndDecode(img_cv)
if data:
logger.debug(f"Decoded data from {file_display_name}: {data[:50]}...")
# Try parsing the decoded data as JSON (expected format from generator)
try:
qr_metadata = json.loads(data)
# Check if it looks like our chunk format
if isinstance(qr_metadata, dict) and 'chunk_index' in qr_metadata and 'total_chunks' in qr_metadata:
qr_metadata['filename'] = file_display_name # Add filename for reference
qr_data_list.append(qr_metadata)
qr_path_list.append(img_path)
else:
# Valid JSON, but not the expected chunk structure
logger.warning(f"Decoded valid JSON, but not expected format from {file_display_name}")
qr_data_list.append({"data": qr_metadata, "chunk_index": -1, "filename": file_display_name}) # Assign default index
qr_path_list.append(img_path)
except json.JSONDecodeError:
# Data decoded, but not JSON - store raw data
logger.warning(f"Could not decode JSON from QR data in {file_display_name}. Storing raw.")
qr_data_list.append({"data": data, "chunk_index": -1, "filename": file_display_name}) # Assign default index
qr_path_list.append(img_path)
except Exception as json_e:
logger.error(f"Error processing decoded JSON from {file_display_name}: {json_e}")
qr_data_list.append({"data": f"Error: {json_e}", "chunk_index": -1, "filename": file_display_name})
qr_path_list.append(img_path)
decode_errors += 1
else:
# QR code detected, but no data decoded (or detection failed)
logger.warning(f"Could not decode data from QR image: {file_display_name}")
qr_data_list.append({"data": "[DECODE FAILED]", "chunk_index": -1, "filename": file_display_name})
qr_path_list.append(img_path)
decode_errors += 1
except Exception as e:
logger.error(f"Error processing QR image file {file_display_name}: {e}", exc_info=True)
decode_errors += 1
# Add placeholder if processing failed entirely
qr_data_list.append({"data": "[PROCESS ERROR]", "chunk_index": -1, "filename": file_display_name})
qr_path_list.append(getattr(file, 'name', None)) # Need path for gallery if possible
# Filter out entries where path couldn't be determined
valid_entries = [(data, path) for data, path in zip(qr_data_list, qr_path_list) if path is not None]
if not valid_entries:
shared_data['qr_paths'] = []
shared_data['qr_data'] = []
return None, None, "β No valid QR codes could be processed or decoded."
qr_data_list = [item[0] for item in valid_entries]
qr_path_list = [item[1] for item in valid_entries]
# Attempt to sort by chunk_index (handle missing index gracefully)
try:
# Create tuples (index, data, path) for sorting
indexed_items = []
for i, (data, path) in enumerate(zip(qr_data_list, qr_path_list)):
# Use provided chunk_index, fallback to list index if missing or invalid (-1)
sort_key = data.get('chunk_index', i)
if not isinstance(sort_key, int) or sort_key < 0:
sort_key = float('inf') # Put items without valid index at the end
indexed_items.append((sort_key, data, path))
# Sort based on the index key
indexed_items.sort(key=lambda x: x[0])
# Unpack sorted lists
sorted_qr_data = [item[1] for item in indexed_items]
sorted_qr_paths = [item[2] for item in indexed_items]
# Update shared data
shared_data['qr_paths'] = sorted_qr_paths
shared_data['qr_data'] = sorted_qr_data
logger.info("Successfully sorted QR data based on chunk_index.")
except Exception as e:
logger.error(f"Error sorting QR data: {e}. Using original order.")
# Use original order if sorting fails
shared_data['qr_paths'] = qr_path_list
shared_data['qr_data'] = qr_data_list
# Generate the visualization image using the helper function
# Use the sorted data stored in shared_data
visualization_image_buffer = _generate_sequence_visualization_image(
shared_data['qr_paths'],
shared_data['qr_data'],
title=f"Visualized Sequence ({len(shared_data['qr_paths'])} Codes)"
)
# Convert buffer to PIL Image for Gradio output if necessary
vis_image_pil = None
if visualization_image_buffer:
try:
vis_image_pil = Image.open(visualization_image_buffer)
except Exception as img_e:
logger.error(f"Failed to load visualization buffer into PIL Image: {img_e}")
status_indicator = "β
" if vis_image_pil else "β οΈ"
status_message = f"Processed {len(shared_data['qr_paths'])} QR codes."
if decode_errors > 0:
status_message += f" ({decode_errors} decode/read errors)"
status_message += "\nSequence visualized." if vis_image_pil else "\nVisualization generation failed."
final_status_message = f"{status_indicator} {status_message}"
# Update outputs: Gallery with sorted paths, Image with visualization, Status text
gallery_output = shared_data['qr_paths']
# Return gallery paths, visualization PIL image, combined status message
return gallery_output, vis_image_pil, final_status_message
def reset_visualizer_state():
shared_data['qr_paths'] = []
shared_data['qr_data'] = []
logger.info("Resetting QR visualizer state.")
# Clear gallery, image, file input, status text
reset_status_message = "βͺ Visualizer Reset. Upload new QR codes."
return None, None, None, reset_status_message
# Event handlers
visualize_btn.click(
process_qr_codes_and_visualize,
inputs=[qr_input],
# Map function outputs to Gradio components
outputs=[qr_preview, qr_visualization, visualization_status] # Gallery, Image, Status Text
).then(
lambda: logger.info("Visualization process complete."), inputs=None, outputs=None
)
reset_btn.click(
reset_visualizer_state,
inputs=[],
outputs=[qr_preview, qr_visualization, qr_input, visualization_status] # Clear gallery, image, file input, status text
)
def create_modern_interface():
"""Create a modern and visually appealing Gradio interface"""
# Modern CSS styling
css = """
/* Modern color scheme */
:root {
--primary-color: #1a365d;
--secondary-color: #2d3748;
--accent-color: #4299e1;
--background-color: #f7fafc;
--success-color: #48bb78;
--error-color: #f56565;
--warning-color: #ed8936;
--text-color: #1a202c;
--border-color: #e2e8f0;
}
body { font-family: sans-serif; color: var(--text-color); }
/* Container styling */
.gradio-container { /* Target the main container */
background-color: var(--background-color);
border-radius: 1rem;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
padding: 1rem; /* Add padding to container */
}
/* Component styling */
.input-container { /* Custom class if needed, otherwise target Gradio classes */
background-color: white;
padding: 1.5rem;
border-radius: 0.5rem;
border: 1px solid var(--border-color);
margin-bottom: 1rem;
}
/* Button styling */
.gradio-button { /* Target Gradio buttons */
transition: all 0.2s;
border-radius: 0.375rem;
padding: 0.75rem 1.5rem;
}
.gradio-button.primary { /* Primary variant */
background-color: var(--primary-color) !important;
color: white !important;
border: none;
}
.gradio-button.primary:hover {
background-color: var(--accent-color) !important;
transform: translateY(-1px);
}
.gradio-button.secondary { /* Secondary variant */
background-color: var(--secondary-color) !important;
color: white !important;
border: none;
}
.gradio-button.secondary:hover {
background-color: #4a5568 !important; /* Darker secondary */
transform: translateY(-1px);
}
/* Status messages */
.status { /* Custom class if needed */
padding: 1rem;
border-radius: 0.375rem;
margin: 1rem 0;
border: 1px solid transparent;
}
.status.success { border-color: var(--success-color); background-color: #f0fff4; color: var(--success-color); }
.status.error { border-color: var(--error-color); background-color: #fff5f5; color: var(--error-color); }
.status.warning { border-color: var(--warning-color); background-color: #fffaf0; color: var(--warning-color); }
/* Gallery styling */
.gradio-gallery { /* Target Gradio gallery */
gap: 1rem;
padding: 1rem;
background-color: white;
border-radius: 0.5rem;
border: 1px solid var(--border-color);
min-height: 150px; /* Ensure gallery has some height */
}
/* Style gallery images */
.gradio-gallery > div[data-testid="gallery-item"] > img {
object-fit: contain !important; /* Use contain to avoid stretching */
border-radius: 0.375rem;
transition: transform 0.2s;
border: 1px solid #eee; /* Add subtle border */
background-color: #f8f9fa; /* Light background for images */
}
.gradio-gallery > div[data-testid="gallery-item"] > img:hover {
transform: scale(1.05);
box-shadow: 0 2px 4px rgba(0,0,0,0.1); /* Add hover shadow */
}
/* Tab styling */
.gradio-tabs > .tabs > .tab-nav > button { /* Target tab buttons */
padding: 0.75rem 1rem;
border-radius: 0.375rem 0.375rem 0 0;
font-weight: 500;
}
.gradio-tabs > .tabs > .tab-nav > button.selected { /* Selected tab */
background-color: white !important;
border-bottom: 2px solid var(--primary-color) !important;
}
/* Textbox/Textarea styling */
.gradio-textbox, .gradio-textarea {
border-radius: 0.375rem !important;
border: 1px solid var(--border-color) !important;
}
.gradio-textbox:focus, .gradio-textarea:focus {
border-color: var(--accent-color) !important;
box-shadow: 0 0 0 1px var(--accent-color) !important;
}
"""
# Create interface with modern design
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# π Advanced Data Processing & QR Code Generator
Transform URLs, files (Text, JSON, PDF, Archives), or direct input into sequenced QR codes. Visualize sequences.
""")
with gr.Tab("βοΈ Data Processor & QR Generator"):
with gr.Row():
with gr.Column(scale=2):
# Input Tabs
with gr.Tabs():
with gr.TabItem("π URL Input"):
url_input = gr.Textbox(
label="Enter URLs (one per line or comma-separated)",
lines=5,
placeholder="https://example1.com\nhttps://example2.com",
elem_id="url-input"
)
with gr.TabItem("π File Input"):
file_input = gr.File(
label="Upload Files (Text, JSON, PDF, Archives: zip, tar, gz, bz2)",
file_count="multiple",
# Specify supported types explicitly for better UX
file_types=[".txt", ".json", ".csv", ".md", ".xml", ".html", ".pdf", ".zip", ".tar", ".gz", ".bz2", ".tgz", ".tar.gz", ".tar.bz2"],
elem_id="file-input"
)
with gr.TabItem("π Direct Input / JSON"):
text_input = gr.TextArea(
label="Direct Text/JSON Input",
lines=10,
placeholder="Paste your text or JSON data here...",
elem_id="text-input"
)
with gr.Row():
example_btn = gr.Button("π Load JSON Example")
clear_btn = gr.Button("ποΈ Clear Input")
# Processing Options & Button
with gr.Row(equal_height=True):
with gr.Column(scale=1, min_width=150):
combine_data = gr.Checkbox(
label="Combine all inputs",
value=True, # Default to combined
info="Create one sequence from all sources."
)
with gr.Column(scale=2):
process_btn = gr.Button(
"π Process & Generate QR Codes",
variant="primary",
elem_id="process-button"
)
# Status Output
output_text = gr.Textbox(
label="Processing Status",
interactive=False,
lines=4, # Increased lines for more status info
elem_id="status-output"
)
with gr.Column(scale=3):
# Output Area
gr.Markdown("### Results")
with gr.Tabs():
with gr.TabItem("πΌοΈ QR Codes"):
output_gallery = gr.Gallery(
label="Generated QR Codes",
columns=4, # Adjust columns as needed
height=500, # Adjust height
object_fit="contain",
preview=True, # Enable preview click
elem_id="qr-gallery"
)
with gr.TabItem("π Processed Data (JSON)"):
output_json = gr.JSON(
label="Processed Data Structure",
elem_id="json-output"
)
# Load example data
def load_example():
example = {
"project": "Data Transfer Example",
"version": 1.1,
"items": [
{"id": "A001", "name": "Item One", "value": 123.45, "tags": ["tag1", "tag2"]},
{"id": "B002", "name": "Item Two", "value": 67.89, "enabled": True}
],
"timestamp": datetime.now().isoformat()
}
return json.dumps(example, indent=2)
def clear_input_area():
# Clear only the direct text input area
return ""
# --- Main Processing Function ---
def process_inputs_and_generate_qrs(urls, files, text, combine, progress=gr.Progress(track_tqdm=True)):
"""Process all inputs, combine if requested, and generate QR codes."""
start_time = time.time()
logger.info("Starting data processing...")
status_updates = []
all_processed_data = [] # List to hold results from all sources
url_processor = EnhancedURLProcessor()
file_processor = EnhancedFileProcessor()
# Estimate total steps for progress bar
num_urls = len(re.split(r'[,\n]+', urls.strip())) if urls and urls.strip() else 0
num_files = len(files) if files else 0
has_text = 1 if text and text.strip() else 0
total_steps = num_urls + num_files + has_text + 1 # +1 for QR generation step
step_counter = 0
# Wrap iterables with tqdm for progress tracking
url_list = []
if urls and urls.strip():
url_list = [u.strip() for u in re.split(r'[,\n]+', urls.strip()) if u.strip()] # Clean up
file_list = files if files else []
text_input_present = text and text.strip()
# 1. Process URLs
if url_list:
status_updates.append(f"Processing {len(url_list)} URLs...")
logger.info(f"Processing URLs: {url_list}")
# Use tqdm description for progress
desc = f"Processing URLs"
for i, url in enumerate(progress.tqdm(url_list, desc=desc, total=len(url_list))):
step_counter += 1
# progress(step_counter / total_steps, desc=f"URL {i+1}/{len(url_list)}")
logger.info(f"Processing URL {i+1}/{len(url_list)}: {url}")
# Basic validation before fetching
if not validators.url(url):
logger.warning(f"Skipping invalid URL format: {url}")
status_updates.append(f"β οΈ Skipped invalid URL: {url[:50]}...")
all_processed_data.append({'error': 'Invalid URL format', 'url': url})
continue
content_data = url_processor.fetch_content(url)
if content_data and 'content' in content_data:
logger.info(f"Successfully fetched content from {url} (Size: {content_data['metadata'].get('content_length_bytes','N/A')} bytes)")
# Structure the result similarly to file processing output
processed_url_data = {
'source': 'url',
'url': content_data['metadata']['final_url'], # Use final URL after redirects
'content': content_data['content'], # Processed text content
# 'raw_bytes': content_data['raw_bytes'], # Don't include raw bytes in final JSON unless needed
'metadata': { # Select relevant metadata
'original_url': url,
'content_type': content_data['metadata']['content_type'],
'status_code': content_data['metadata']['status_code'],
'encoding_used': content_data['metadata']['encoding_used'],
'content_length_bytes': content_data['metadata']['content_length_bytes']
},
'timestamp': datetime.now().isoformat()
}
all_processed_data.append(processed_url_data)
status_updates.append(f"β Fetched: {url[:60]}...")
else:
logger.error(f"Failed to fetch content from URL: {url}")
status_updates.append(f"β Failed fetch: {url[:60]}...")
all_processed_data.append({'error': 'Failed to fetch content', 'url': url})
# 2. Process Files
if file_list:
status_updates.append(f"Processing {len(file_list)} uploaded files...")
logger.info(f"Processing {len(file_list)} files.")
desc = f"Processing Files"
for i, file_obj in enumerate(progress.tqdm(file_list, desc=desc, total=len(file_list))):
step_counter += 1
# progress(step_counter / total_steps, desc=f"File {i+1}/{len(file_list)}")
file_name_for_log = getattr(file_obj, 'orig_name', getattr(file_obj, 'name', 'N/A')) # Try orig_name first
logger.info(f"Processing file {i+1}/{len(file_list)}: {file_name_for_log}")
try:
# Pass the Gradio file object directly to process_file
file_results = file_processor.process_file(file_obj)
if file_results:
# Filter out results that are just errors before extending
valid_results = [res for res in file_results if 'error' not in res]
error_results = [res for res in file_results if 'error' in res]
if valid_results:
all_processed_data.extend(valid_results)
processed_filenames = [res.get('filename', 'N/A') for res in valid_results]
status_updates.append(f"β Processed: {', '.join(processed_filenames)}")
logger.info(f"Successfully processed: {', '.join(processed_filenames)}")
if error_results:
all_processed_data.extend(error_results) # Keep errors for JSON output
error_filenames = [res.get('filename', 'N/A') for res in error_results]
status_updates.append(f"β Errors processing: {', '.join(error_filenames)}")
logger.warning(f"Errors processing: {', '.join(error_filenames)}")
if not valid_results and not error_results:
status_updates.append(f"β οΈ No data extracted from: {file_name_for_log}")
logger.warning(f"No data extracted from: {file_name_for_log}")
all_processed_data.append({'error': 'No data extracted', 'filename': file_name_for_log})
except Exception as file_proc_err:
file_name = getattr(file_obj, 'orig_name', getattr(file_obj, 'name', 'N/A'))
logger.error(f"Error processing file {file_name}: {file_proc_err}", exc_info=True)
status_updates.append(f"β Error processing file: {file_name}")
all_processed_data.append({'error': f'File processing error: {file_proc_err}', 'filename': file_name})
# 3. Process Direct Text/JSON Input
if text_input_present:
step_counter += 1
progress(step_counter / total_steps, desc="Processing Direct Input")
status_updates.append("Processing direct input...")
logger.info("Processing direct text/JSON input.")
# Attempt to parse as JSON first
try:
json_data = json.loads(text)
logger.info("Direct input parsed as JSON.")
processed_text_data = {
'source': 'direct_json',
'content': json_data, # Parsed JSON object/list
'raw_content': text, # Original string
'timestamp': datetime.now().isoformat()
}
all_processed_data.append(processed_text_data)
status_updates.append("β Processed direct input as JSON.")
except json.JSONDecodeError:
# If not JSON, treat as plain text
logger.info("Direct input treated as plain text.")
processed_text_data = {
'source': 'direct_text',
'content': text, # Store as plain text
'timestamp': datetime.now().isoformat()
}
all_processed_data.append(processed_text_data)
status_updates.append("β Processed direct input as Text.")
except Exception as direct_input_err:
logger.error(f"Error processing direct input: {direct_input_err}", exc_info=True)
status_updates.append(f"β Error processing direct input.")
all_processed_data.append({'error': f'Direct input error: {direct_input_err}', 'source': 'direct_input'})
# 4. Check if any valid data was processed
valid_processed_data = [d for d in all_processed_data if 'error' not in d]
if not valid_processed_data:
logger.warning("No valid data sources found or processed.")
status_updates.append("β οΈ No valid data to process. Please provide input or check errors.")
final_status = "\n".join(status_updates)
# Return the error data for JSON view, empty gallery, and status
return all_processed_data, [], final_status
logger.info(f"Total valid processed data items: {len(valid_processed_data)}")
status_updates.append(f"Data processed ({len(valid_processed_data)} valid items). Generating QR codes...")
# 5. Generate QR Codes using only valid data
qr_paths = []
progress(step_counter / total_steps, desc="Generating QR Codes") # Update progress before QR step
try:
# Pass the list of *valid* processed data items
data_for_qr = valid_processed_data if combine else valid_processed_data # Pass the list itself if not combining
qr_paths = generate_qr_codes(data_for_qr, combine)
if qr_paths:
status_updates.append(f"β Generated {len(qr_paths)} QR codes.")
logger.info(f"Successfully generated {len(qr_paths)} QR codes.")
else:
# Check if chunking failed or QR generation failed for all chunks
if not valid_processed_data: # Should have been caught earlier, but double-check
status_updates.append("β No valid data was available for QR generation.")
else:
status_updates.append("β QR code generation failed or produced no codes (check logs for details).")
logger.error("QR code generation returned no paths despite valid input data.")
# Keep processed data, but gallery will be empty
except Exception as qr_gen_err:
logger.error(f"Error during QR code generation step: {qr_gen_err}", exc_info=True)
status_updates.append(f"β Error generating QR codes: {qr_gen_err}")
# Keep processed data, gallery will be empty
# 6. Finalize and Return
end_time = time.time()
processing_time = end_time - start_time
status_updates.append(f"Total processing time: {processing_time:.2f} seconds.")
final_status = "\n".join(status_updates)
# Return processed data (including errors for JSON view), QR paths (for Gallery), and status string
# Ensure qr_paths is a list of strings
qr_paths_str = [str(p) for p in qr_paths] if qr_paths else []
# Return all data (including errors) for JSON output, gallery paths, and status text
return all_processed_data, qr_paths_str, final_status
# --- Event Handlers ---
example_btn.click(load_example, outputs=[text_input])
clear_btn.click(clear_input_area, outputs=[text_input])
process_btn.click(
process_inputs_and_generate_qrs,
inputs=[url_input, file_input, text_input, combine_data],
outputs=[output_json, output_gallery, output_text] # Match function return order
)
# Add helpful documentation
gr.Markdown("""
### π Features
- **Complete URL Scraping**: Extracts text content from web pages (HTML, Text, JSON, PDF). Follows redirects.
- **Advanced File Processing**: Handles text, JSON, PDF, and archives (.zip, .tar.*, .gz, .bz2). Extracts archive contents. Attempts intelligent JSON detection.
- **Direct Input**: Paste text or JSON directly.
- **Sequential QR Codes**: Chunks large data and embeds sequencing info. Option to combine inputs. Uses Error Correction Level H for robustness.
- **Modern Design**: Clean, responsive interface with progress tracking.
### π‘ Tips
1. **Inputs**: Use any combination of URL, File, or Direct Input tabs.
2. **Combine**: Check 'Combine all inputs' to create one QR sequence from all sources. Uncheck to get separate QR sequences for each source (URL, file within archive, direct input).
3. **Files**: Upload text-based files, JSON, PDF, or supported archives. Content from archives is extracted and processed individually. Large files up to 5GB are supported (but QR generation may fail for very large content).
4. **JSON**: Use the example button or upload a `.json` file. The app also tries to parse `.txt` or other files as JSON if they contain valid JSON structure.
5. **Status**: Monitor the Processing Status box for feedback, including errors and progress.
### π¨ Output
- Generated QR codes appear in the 'QR Codes' tab and are saved in the `output/qr_codes` directory.
- The structured data processed from all inputs (including any errors) is shown in the 'Processed Data (JSON)' tab.
- Hover over or click QR codes in the gallery for a larger preview.
- Use the 'QR Sequence Visualizer' tab to decode and verify sequences.
""")
# Add the QR sequence visualizer tab
create_qr_sequence_visualizer(output_gallery) # Pass gallery if needed
return interface
def main():
"""Initialize and launch the application"""
try:
# Configure system settings if needed
mimetypes.init() # Ensure mime types are loaded
logger.info("Starting Gradio application...")
# Create and launch interface
interface = create_modern_interface()
# Launch with configuration
interface.launch(
share=os.getenv("GRADIO_SHARE", "false").lower() == "true", # Allow sharing via env var
debug=os.getenv("GRADIO_DEBUG", "false").lower() == "true", # Allow debug via env var
show_error=True, # Show Python errors in browser console
server_name="0.0.0.0", # Bind to all interfaces for container/network access
server_port=int(os.getenv("GRADIO_PORT", 7860)), # Allow port config via env var
show_api=False, # Disable default Gradio API endpoint unless needed
# enable_queue=True # Consider enabling queue for longer tasks
)
logger.info("Gradio application stopped.")
except Exception as e:
logger.critical(f"Application startup or runtime error: {e}", exc_info=True)
# Optionally add a small delay or specific cleanup before exiting
time.sleep(1)
raise # Reraise critical errors
if __name__ == "__main__":
# Ensure output directories exist before starting
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
QR_CODES_DIR.mkdir(parents=True, exist_ok=True)
TEMP_DIR.mkdir(parents=True, exist_ok=True)
main() |