Spaces:
Sleeping
Sleeping
tool to download logos from internet
Browse filesstreamlit code to download company logo from the internet
- app.py +142 -0
- gradio_app.py +262 -0
- requirements.txt +7 -2
- services/__pycache__/appconfig.cpython-310.pyc +0 -0
- services/__pycache__/entity_extractor.cpython-310.pyc +0 -0
- services/__pycache__/image_downloader.cpython-310.pyc +0 -0
- services/__pycache__/logo_downloader.cpython-310.pyc +0 -0
- services/appconfig.py +60 -0
- services/entity_extractor.py +195 -0
- services/image_downloader.py +278 -0
- services/logo_downloader.py +228 -0
- utils/__pycache__/utils.cpython-310.pyc +0 -0
- utils/utils.py +178 -0
app.py
ADDED
@@ -0,0 +1,142 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Streamlit web interface for the Logo Downloader
|
3 |
+
"""
|
4 |
+
|
5 |
+
import os
|
6 |
+
import logging
|
7 |
+
import streamlit as st
|
8 |
+
from pathlib import Path
|
9 |
+
from typing import Optional
|
10 |
+
|
11 |
+
from services.logo_downloader import LogoDownloader
|
12 |
+
from services.appconfig import GEMINI_API_KEY, DEFAULT_LOGOS_PER_ENTITY, MAX_LOGOS_PER_ENTITY
|
13 |
+
|
14 |
+
# Setup logging
|
15 |
+
logging.basicConfig(level=logging.INFO)
|
16 |
+
logger = logging.getLogger(__name__)
|
17 |
+
|
18 |
+
|
19 |
+
def process_text_request(text: str, api_key: Optional[str], num_logos: int = DEFAULT_LOGOS_PER_ENTITY):
|
20 |
+
"""
|
21 |
+
Process text and download logos through Streamlit interface
|
22 |
+
"""
|
23 |
+
try:
|
24 |
+
if not text or not text.strip():
|
25 |
+
return "❌ Please provide some text to analyze.", None, "No text provided."
|
26 |
+
|
27 |
+
if num_logos < 1 or num_logos > MAX_LOGOS_PER_ENTITY:
|
28 |
+
return f"❌ Number of logos must be between 1 and {MAX_LOGOS_PER_ENTITY}.", None, f"Invalid number: {num_logos}"
|
29 |
+
|
30 |
+
final_api_key = api_key.strip() if api_key and api_key.strip() else GEMINI_API_KEY
|
31 |
+
|
32 |
+
downloader = LogoDownloader(gemini_api_key=final_api_key)
|
33 |
+
results = downloader.process_text(text, num_logos)
|
34 |
+
|
35 |
+
if results['status'] == 'success' and results['stats']['total_downloads'] > 0:
|
36 |
+
status_msg = f"✅ {downloader.get_stats_summary()}"
|
37 |
+
zip_path = results.get('zip_path')
|
38 |
+
detailed_results = _format_detailed_results(results)
|
39 |
+
return status_msg, zip_path, detailed_results
|
40 |
+
|
41 |
+
elif results['status'] == 'warning':
|
42 |
+
return f"⚠️ {results['message']}", None, results.get('message', 'No details available')
|
43 |
+
|
44 |
+
else:
|
45 |
+
return f"❌ Processing failed: {results['message']}", None, results.get('message', 'Unknown error')
|
46 |
+
|
47 |
+
except Exception as e:
|
48 |
+
logger.error(f"Error in process_text_request: {e}")
|
49 |
+
return f"❌ An error occurred: {str(e)}", None, f"Error details: {str(e)}"
|
50 |
+
|
51 |
+
|
52 |
+
def _format_detailed_results(results):
|
53 |
+
if not results.get('results'):
|
54 |
+
return "No detailed results available."
|
55 |
+
|
56 |
+
details = []
|
57 |
+
details.append(f"📊 **Processing Summary:**")
|
58 |
+
details.append(f"- Total entities found: {results['stats']['total_entities']}")
|
59 |
+
details.append(f"- Total logos downloaded: {results['stats']['total_downloads']}")
|
60 |
+
details.append(f"- Successful entities: {results['stats']['successful_entities']}")
|
61 |
+
details.append(f"- Failed entities: {results['stats']['failed_entities']}")
|
62 |
+
details.append("")
|
63 |
+
details.append("📋 **Entity Details:**")
|
64 |
+
|
65 |
+
for result in results['results']:
|
66 |
+
entity = result['entity']
|
67 |
+
count = result['downloaded_count']
|
68 |
+
if count > 0:
|
69 |
+
details.append(f"✅ **{entity}**: {count} logos downloaded")
|
70 |
+
else:
|
71 |
+
error_msg = result.get('error', 'No logos found')
|
72 |
+
details.append(f"❌ **{entity}**: Failed ({error_msg})")
|
73 |
+
|
74 |
+
return "\n".join(details)
|
75 |
+
|
76 |
+
|
77 |
+
def main():
|
78 |
+
st.set_page_config(page_title="🎨 Logo Downloader", layout="centered")
|
79 |
+
st.title("🎨 Logo Downloader")
|
80 |
+
st.markdown("Extract entities from text and download their logos automatically.")
|
81 |
+
|
82 |
+
with st.form(key="logo_form"):
|
83 |
+
text_input = st.text_area(
|
84 |
+
"📝 Enter text containing company names, products, or brands:",
|
85 |
+
placeholder="e.g., We use AWS, Docker, React, and Adobe Photoshop for our projects",
|
86 |
+
height=150
|
87 |
+
)
|
88 |
+
|
89 |
+
api_key_input = st.text_input(
|
90 |
+
"🔑 Gemini API Key (optional)",
|
91 |
+
type="password",
|
92 |
+
placeholder="Enter your Gemini API key for enhanced extraction"
|
93 |
+
)
|
94 |
+
|
95 |
+
num_logos_input = st.slider(
|
96 |
+
"🖼️ Logos per entity",
|
97 |
+
min_value=1,
|
98 |
+
max_value=MAX_LOGOS_PER_ENTITY,
|
99 |
+
value=DEFAULT_LOGOS_PER_ENTITY,
|
100 |
+
step=1
|
101 |
+
)
|
102 |
+
|
103 |
+
submit_btn = st.form_submit_button("🚀 Download Logos")
|
104 |
+
|
105 |
+
if submit_btn:
|
106 |
+
with st.spinner("Processing logos..."):
|
107 |
+
status_msg, zip_path, detailed_results = process_text_request(
|
108 |
+
text_input,
|
109 |
+
api_key_input,
|
110 |
+
num_logos_input
|
111 |
+
)
|
112 |
+
st.markdown(status_msg)
|
113 |
+
|
114 |
+
if zip_path and Path(zip_path).exists():
|
115 |
+
with open(zip_path, "rb") as f:
|
116 |
+
st.download_button(
|
117 |
+
label="📥 Download Logos ZIP",
|
118 |
+
data=f,
|
119 |
+
file_name=Path(zip_path).name,
|
120 |
+
mime="application/zip"
|
121 |
+
)
|
122 |
+
|
123 |
+
st.markdown("## 📊 Detailed Results")
|
124 |
+
st.markdown(detailed_results)
|
125 |
+
|
126 |
+
st.markdown("---")
|
127 |
+
st.info("💡 Tip: Get a free Gemini API key at [Google AI Studio](https://makersuite.google.com/app/apikey) for better extraction accuracy.")
|
128 |
+
|
129 |
+
st.markdown("## 💡 Examples")
|
130 |
+
examples = [
|
131 |
+
"Our tech stack includes React, Node.js, MongoDB, Docker, AWS, and we use Figma for design, along with GitHub for version control.",
|
132 |
+
"The team uses Microsoft Office, Adobe Creative Suite, Slack for communication, Zoom for meetings, and Salesforce for CRM.",
|
133 |
+
"Popular social media platforms like Instagram, TikTok, Twitter, LinkedIn, and YouTube are essential for digital marketing."
|
134 |
+
]
|
135 |
+
|
136 |
+
for ex in examples:
|
137 |
+
if st.button(f"Use example: {ex[:50]}..."):
|
138 |
+
st.session_state["text_input"] = ex
|
139 |
+
|
140 |
+
|
141 |
+
if __name__ == "__main__":
|
142 |
+
main()
|
gradio_app.py
ADDED
@@ -0,0 +1,262 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Gradio web interface for the Logo Downloader
|
3 |
+
"""
|
4 |
+
import os
|
5 |
+
import gradio as gr
|
6 |
+
import logging
|
7 |
+
from pathlib import Path
|
8 |
+
from typing import Optional
|
9 |
+
|
10 |
+
from services.logo_downloader import LogoDownloader
|
11 |
+
from services.appconfig import GEMINI_API_KEY, DEFAULT_LOGOS_PER_ENTITY, MAX_LOGOS_PER_ENTITY
|
12 |
+
|
13 |
+
# Setup logging
|
14 |
+
logging.basicConfig(level=logging.INFO)
|
15 |
+
logger = logging.getLogger(__name__)
|
16 |
+
|
17 |
+
def process_text_request(text: str, api_key: Optional[str], num_logos: int = DEFAULT_LOGOS_PER_ENTITY):
|
18 |
+
"""
|
19 |
+
Process text and download logos through Gradio interface
|
20 |
+
|
21 |
+
Args:
|
22 |
+
text (str): Input text
|
23 |
+
api_key (str): Optional Gemini API key
|
24 |
+
num_logos (int): Number of logos per entity
|
25 |
+
|
26 |
+
Returns:
|
27 |
+
Tuple: (status_message, zip_file_path or None, detailed_results)
|
28 |
+
"""
|
29 |
+
try:
|
30 |
+
# Validate inputs
|
31 |
+
if not text or not text.strip():
|
32 |
+
return "❌ Please provide some text to analyze.", None, "No text provided."
|
33 |
+
|
34 |
+
if num_logos < 1 or num_logos > MAX_LOGOS_PER_ENTITY:
|
35 |
+
return f"❌ Number of logos must be between 1 and {MAX_LOGOS_PER_ENTITY}.", None, f"Invalid number: {num_logos}"
|
36 |
+
|
37 |
+
# Use provided API key or environment variable
|
38 |
+
final_api_key = api_key.strip() if api_key and api_key.strip() else GEMINI_API_KEY
|
39 |
+
|
40 |
+
# Initialize downloader
|
41 |
+
downloader = LogoDownloader(gemini_api_key=final_api_key)
|
42 |
+
|
43 |
+
# Process the text
|
44 |
+
results = downloader.process_text(text, num_logos)
|
45 |
+
|
46 |
+
# Format response based on results
|
47 |
+
if results['status'] == 'success' and results['stats']['total_downloads'] > 0:
|
48 |
+
status_msg = f"✅ {downloader.get_stats_summary()}"
|
49 |
+
zip_path = results.get('zip_path')
|
50 |
+
|
51 |
+
# Create detailed results
|
52 |
+
detailed_results = _format_detailed_results(results)
|
53 |
+
|
54 |
+
return status_msg, zip_path, detailed_results
|
55 |
+
|
56 |
+
elif results['status'] == 'warning':
|
57 |
+
return f"⚠️ {results['message']}", None, results.get('message', 'No details available')
|
58 |
+
|
59 |
+
else:
|
60 |
+
return f"❌ Processing failed: {results['message']}", None, results.get('message', 'Unknown error')
|
61 |
+
|
62 |
+
except Exception as e:
|
63 |
+
logger.error(f"Error in process_text_request: {e}")
|
64 |
+
return f"❌ An error occurred: {str(e)}", None, f"Error details: {str(e)}"
|
65 |
+
|
66 |
+
|
67 |
+
def _format_detailed_results(results):
|
68 |
+
"""Format detailed results for display"""
|
69 |
+
if not results.get('results'):
|
70 |
+
return "No detailed results available."
|
71 |
+
|
72 |
+
details = []
|
73 |
+
details.append(f"📊 **Processing Summary:**")
|
74 |
+
details.append(f"- Total entities found: {results['stats']['total_entities']}")
|
75 |
+
details.append(f"- Total logos downloaded: {results['stats']['total_downloads']}")
|
76 |
+
details.append(f"- Successful entities: {results['stats']['successful_entities']}")
|
77 |
+
details.append(f"- Failed entities: {results['stats']['failed_entities']}")
|
78 |
+
details.append("")
|
79 |
+
details.append("📋 **Entity Details:**")
|
80 |
+
|
81 |
+
for result in results['results']:
|
82 |
+
entity = result['entity']
|
83 |
+
count = result['downloaded_count']
|
84 |
+
|
85 |
+
if count > 0:
|
86 |
+
details.append(f"✅ **{entity}**: {count} logos downloaded")
|
87 |
+
else:
|
88 |
+
error_msg = result.get('error', 'No logos found')
|
89 |
+
details.append(f"❌ **{entity}**: Failed ({error_msg})")
|
90 |
+
|
91 |
+
return "\n".join(details)
|
92 |
+
|
93 |
+
|
94 |
+
def create_interface():
|
95 |
+
"""Create and configure Gradio interface"""
|
96 |
+
|
97 |
+
# Custom CSS for better styling
|
98 |
+
css = """
|
99 |
+
.gradio-container {
|
100 |
+
max-width: 1200px !important;
|
101 |
+
margin: auto !important;
|
102 |
+
}
|
103 |
+
.main-header {
|
104 |
+
text-align: center;
|
105 |
+
margin-bottom: 2rem;
|
106 |
+
}
|
107 |
+
.status-success {
|
108 |
+
color: #10b981 !important;
|
109 |
+
}
|
110 |
+
.status-error {
|
111 |
+
color: #ef4444 !important;
|
112 |
+
}
|
113 |
+
.status-warning {
|
114 |
+
color: #f59e0b !important;
|
115 |
+
}
|
116 |
+
"""
|
117 |
+
|
118 |
+
with gr.Blocks(css=css, title="Logo Downloader", theme=gr.themes.Soft()) as interface:
|
119 |
+
|
120 |
+
# Header
|
121 |
+
gr.HTML("""
|
122 |
+
<div class="main-header">
|
123 |
+
<h1>🎨 Logo Downloader</h1>
|
124 |
+
<p>Extract entities from text and download their logos automatically</p>
|
125 |
+
</div>
|
126 |
+
""")
|
127 |
+
|
128 |
+
with gr.Row():
|
129 |
+
with gr.Column(scale=2):
|
130 |
+
# Input section
|
131 |
+
gr.Markdown("## 📝 Input")
|
132 |
+
|
133 |
+
text_input = gr.Textbox(
|
134 |
+
label="Text to analyze",
|
135 |
+
placeholder="Enter text containing company names, products, or brands (e.g., 'We use AWS, Docker, React, and Adobe Photoshop for our projects')",
|
136 |
+
lines=5,
|
137 |
+
max_lines=10
|
138 |
+
)
|
139 |
+
|
140 |
+
with gr.Row():
|
141 |
+
api_key_input = gr.Textbox(
|
142 |
+
label="Gemini API Key (optional)",
|
143 |
+
placeholder="Enter your Gemini API key for better entity extraction",
|
144 |
+
type="password",
|
145 |
+
value=""
|
146 |
+
)
|
147 |
+
|
148 |
+
num_logos_input = gr.Slider(
|
149 |
+
label="Logos per entity",
|
150 |
+
minimum=1,
|
151 |
+
maximum=MAX_LOGOS_PER_ENTITY,
|
152 |
+
value=DEFAULT_LOGOS_PER_ENTITY,
|
153 |
+
step=1
|
154 |
+
)
|
155 |
+
|
156 |
+
process_btn = gr.Button("🚀 Download Logos", variant="primary", size="lg")
|
157 |
+
|
158 |
+
# API key help
|
159 |
+
gr.Markdown("""
|
160 |
+
💡 **Tip:** Get a free Gemini API key at [Google AI Studio](https://makersuite.google.com/app/apikey) for better entity extraction.
|
161 |
+
Without an API key, the tool will use basic pattern matching.
|
162 |
+
""")
|
163 |
+
|
164 |
+
with gr.Column(scale=1):
|
165 |
+
# Output section
|
166 |
+
gr.Markdown("## 📊 Results")
|
167 |
+
|
168 |
+
status_output = gr.Textbox(
|
169 |
+
label="Status",
|
170 |
+
interactive=False,
|
171 |
+
lines=2
|
172 |
+
)
|
173 |
+
|
174 |
+
download_output = gr.File(
|
175 |
+
label="Download ZIP",
|
176 |
+
interactive=False
|
177 |
+
)
|
178 |
+
|
179 |
+
detailed_output = gr.Textbox(
|
180 |
+
label="Detailed Results",
|
181 |
+
interactive=False,
|
182 |
+
lines=10,
|
183 |
+
max_lines=15
|
184 |
+
)
|
185 |
+
|
186 |
+
# Examples section
|
187 |
+
gr.Markdown("## 💡 Examples")
|
188 |
+
|
189 |
+
examples = [
|
190 |
+
[
|
191 |
+
"Our tech stack includes React, Node.js, MongoDB, Docker, AWS, and we use Figma for design, along with GitHub for version control.",
|
192 |
+
"",
|
193 |
+
8
|
194 |
+
],
|
195 |
+
[
|
196 |
+
"The team uses Microsoft Office, Adobe Creative Suite, Slack for communication, Zoom for meetings, and Salesforce for CRM.",
|
197 |
+
"",
|
198 |
+
6
|
199 |
+
],
|
200 |
+
[
|
201 |
+
"Popular social media platforms like Instagram, TikTok, Twitter, LinkedIn, and YouTube are essential for digital marketing.",
|
202 |
+
"",
|
203 |
+
5
|
204 |
+
]
|
205 |
+
]
|
206 |
+
|
207 |
+
gr.Examples(
|
208 |
+
examples=examples,
|
209 |
+
inputs=[text_input, api_key_input, num_logos_input],
|
210 |
+
outputs=[status_output, download_output, detailed_output],
|
211 |
+
fn=process_text_request,
|
212 |
+
cache_examples=False
|
213 |
+
)
|
214 |
+
|
215 |
+
# Process button click event
|
216 |
+
process_btn.click(
|
217 |
+
fn=process_text_request,
|
218 |
+
inputs=[text_input, api_key_input, num_logos_input],
|
219 |
+
outputs=[status_output, download_output, detailed_output],
|
220 |
+
show_progress='minimal'
|
221 |
+
)
|
222 |
+
|
223 |
+
# Footer
|
224 |
+
gr.HTML("""
|
225 |
+
<div style="text-align: center; margin-top: 2rem; padding: 1rem; border-top: 1px solid #e5e7eb;">
|
226 |
+
<p>🔧 Built with Gradio | 🤖 Powered by Gemini AI</p>
|
227 |
+
<p><small>This tool respects rate limits and downloads publicly available logos.</small></p>
|
228 |
+
</div>
|
229 |
+
""")
|
230 |
+
|
231 |
+
return interface
|
232 |
+
|
233 |
+
|
234 |
+
def main():
|
235 |
+
"""Main function to launch the application"""
|
236 |
+
logger.info("Starting Logo Downloader application...")
|
237 |
+
|
238 |
+
# Check for API key
|
239 |
+
if not GEMINI_API_KEY:
|
240 |
+
logger.warning("No Gemini API key found in environment variables")
|
241 |
+
logger.info("The application will work with fallback entity extraction")
|
242 |
+
else:
|
243 |
+
logger.info("Gemini API key found")
|
244 |
+
|
245 |
+
# Create and launch interface
|
246 |
+
interface = create_interface()
|
247 |
+
|
248 |
+
# Launch configuration
|
249 |
+
launch_kwargs = {
|
250 |
+
"server_name": "0.0.0.0",
|
251 |
+
"server_port": int(os.environ.get("PORT", 7860)),
|
252 |
+
"share": False,
|
253 |
+
"show_error": True,
|
254 |
+
"max_threads": 4
|
255 |
+
}
|
256 |
+
|
257 |
+
# Launch the interface
|
258 |
+
interface.launch(**launch_kwargs)
|
259 |
+
|
260 |
+
|
261 |
+
if __name__ == "__main__":
|
262 |
+
main()
|
requirements.txt
CHANGED
@@ -1,3 +1,8 @@
|
|
1 |
-
|
2 |
-
|
|
|
|
|
|
|
|
|
|
|
3 |
streamlit
|
|
|
1 |
+
google-generativeai==0.5.4
|
2 |
+
requests==2.31.0
|
3 |
+
beautifulsoup4==4.12.2
|
4 |
+
gradio==4.15.0
|
5 |
+
python-dotenv==1.0.0
|
6 |
+
Pillow==10.0.1
|
7 |
+
pathlib
|
8 |
streamlit
|
services/__pycache__/appconfig.cpython-310.pyc
ADDED
Binary file (1.69 kB). View file
|
|
services/__pycache__/entity_extractor.cpython-310.pyc
ADDED
Binary file (5.48 kB). View file
|
|
services/__pycache__/image_downloader.cpython-310.pyc
ADDED
Binary file (8.31 kB). View file
|
|
services/__pycache__/logo_downloader.cpython-310.pyc
ADDED
Binary file (6.5 kB). View file
|
|
services/appconfig.py
ADDED
@@ -0,0 +1,60 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Configuration settings for the Logo Downloader application
|
3 |
+
"""
|
4 |
+
import os
|
5 |
+
from pathlib import Path
|
6 |
+
from dotenv import load_dotenv
|
7 |
+
load_dotenv()
|
8 |
+
|
9 |
+
# API Configuration
|
10 |
+
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')
|
11 |
+
|
12 |
+
# Directory Configuration
|
13 |
+
BASE_DIR = Path(__file__).parent
|
14 |
+
# DOWNLOADS_DIR = BASE_DIR / 'downloads'
|
15 |
+
|
16 |
+
DOWNLOADS_DIR = Path('downloads')
|
17 |
+
|
18 |
+
TEMP_DIR = BASE_DIR / 'temp'
|
19 |
+
|
20 |
+
# Download Configuration
|
21 |
+
MAX_ENTITIES = 20
|
22 |
+
MAX_LOGOS_PER_ENTITY = 15
|
23 |
+
DEFAULT_LOGOS_PER_ENTITY = 10
|
24 |
+
DOWNLOAD_TIMEOUT = 15
|
25 |
+
REQUEST_DELAY = 1 # seconds between requests
|
26 |
+
|
27 |
+
# File Configuration
|
28 |
+
ALLOWED_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.svg', '.webp']
|
29 |
+
MIN_FILE_SIZE = 500 # bytes
|
30 |
+
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
|
31 |
+
|
32 |
+
# HTTP Configuration
|
33 |
+
HEADERS = {
|
34 |
+
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
|
35 |
+
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
36 |
+
'Accept-Language': 'en-US,en;q=0.5',
|
37 |
+
'Accept-Encoding': 'gzip, deflate',
|
38 |
+
'Connection': 'keep-alive',
|
39 |
+
'Upgrade-Insecure-Requests': '1',
|
40 |
+
}
|
41 |
+
|
42 |
+
# Image signatures for validation
|
43 |
+
IMAGE_SIGNATURES = [
|
44 |
+
b'\x89PNG', # PNG
|
45 |
+
b'\xff\xd8\xff', # JPEG
|
46 |
+
b'<svg', # SVG
|
47 |
+
b'RIFF', # WebP
|
48 |
+
b'GIF8', # GIF
|
49 |
+
]
|
50 |
+
|
51 |
+
# Common tech entities for fallback
|
52 |
+
COMMON_TECH_ENTITIES = [
|
53 |
+
'Microsoft', 'Google', 'Apple', 'Amazon', 'Adobe', 'React', 'Angular', 'Vue',
|
54 |
+
'Docker', 'Kubernetes', 'AWS', 'Azure', 'Firebase', 'MongoDB', 'PostgreSQL',
|
55 |
+
'Redis', 'Node.js', 'Python', 'JavaScript', 'TypeScript', 'Figma', 'Sketch',
|
56 |
+
'Photoshop', 'Illustrator', 'AutoCAD', 'Unity', 'Blender', 'GitHub', 'GitLab',
|
57 |
+
'Slack', 'Discord', 'Zoom', 'Teams', 'Spotify', 'Netflix', 'Instagram',
|
58 |
+
'Facebook', 'Twitter', 'LinkedIn', 'TikTok', 'WhatsApp', 'Telegram',
|
59 |
+
'Shopify', 'WordPress', 'Salesforce', 'Microsoft Fabric'
|
60 |
+
]
|
services/entity_extractor.py
ADDED
@@ -0,0 +1,195 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Entity extraction module using Gemini AI with fallback methods
|
3 |
+
"""
|
4 |
+
import re
|
5 |
+
import logging
|
6 |
+
from typing import List, Optional
|
7 |
+
import google.generativeai as genai
|
8 |
+
|
9 |
+
from services.appconfig import GEMINI_API_KEY, COMMON_TECH_ENTITIES, MAX_ENTITIES
|
10 |
+
|
11 |
+
logger = logging.getLogger(__name__)
|
12 |
+
|
13 |
+
|
14 |
+
class EntityExtractor:
|
15 |
+
"""Extract entities from text using Gemini AI or fallback methods"""
|
16 |
+
|
17 |
+
def __init__(self, api_key: Optional[str] = None):
|
18 |
+
"""
|
19 |
+
Initialize EntityExtractor
|
20 |
+
|
21 |
+
Args:
|
22 |
+
api_key (str, optional): Gemini API key
|
23 |
+
"""
|
24 |
+
self.api_key = api_key or GEMINI_API_KEY
|
25 |
+
self.model = None
|
26 |
+
self._setup_gemini()
|
27 |
+
|
28 |
+
def _setup_gemini(self) -> None:
|
29 |
+
"""Setup Gemini API"""
|
30 |
+
if not self.api_key:
|
31 |
+
logger.warning("No Gemini API key provided, using fallback method")
|
32 |
+
return
|
33 |
+
|
34 |
+
try:
|
35 |
+
genai.configure(api_key=self.api_key)
|
36 |
+
self.model = genai.GenerativeModel('gemini-2.0-flash-exp')
|
37 |
+
logger.info("Gemini API initialized successfully")
|
38 |
+
except Exception as e:
|
39 |
+
logger.error(f"Failed to initialize Gemini API: {e}")
|
40 |
+
self.model = None
|
41 |
+
|
42 |
+
def extract_with_gemini(self, text: str) -> List[str]:
|
43 |
+
"""
|
44 |
+
Extract entities using Gemini AI
|
45 |
+
|
46 |
+
Args:
|
47 |
+
text (str): Input text
|
48 |
+
|
49 |
+
Returns:
|
50 |
+
List[str]: List of extracted entities
|
51 |
+
"""
|
52 |
+
if not self.model:
|
53 |
+
raise Exception("Gemini model not available")
|
54 |
+
|
55 |
+
prompt = """
|
56 |
+
Extract company names, product names, software names, tool names, and brand names from this text.
|
57 |
+
Only return names that would have recognizable logos (like Microsoft, Adobe, React, etc.).
|
58 |
+
Return as a simple list, one name per line, no bullet points or numbers.
|
59 |
+
Avoid generic terms like "cloud" or "database".
|
60 |
+
|
61 |
+
Text: {text}
|
62 |
+
""".format(text=text)
|
63 |
+
|
64 |
+
try:
|
65 |
+
response = self.model.generate_content(prompt)
|
66 |
+
|
67 |
+
if not response.text:
|
68 |
+
return []
|
69 |
+
|
70 |
+
entities = [
|
71 |
+
line.strip()
|
72 |
+
for line in response.text.strip().split('\n')
|
73 |
+
if line.strip() and not line.strip().startswith('-') and len(line.strip()) > 1
|
74 |
+
]
|
75 |
+
|
76 |
+
# Filter out common words that aren't entities
|
77 |
+
filtered_entities = []
|
78 |
+
for entity in entities:
|
79 |
+
if self._is_valid_entity(entity):
|
80 |
+
filtered_entities.append(entity)
|
81 |
+
|
82 |
+
logger.info(f"Gemini extracted {len(filtered_entities)} entities")
|
83 |
+
return filtered_entities[:MAX_ENTITIES]
|
84 |
+
|
85 |
+
except Exception as e:
|
86 |
+
logger.error(f"Gemini extraction failed: {e}")
|
87 |
+
raise
|
88 |
+
|
89 |
+
def extract_with_fallback(self, text: str) -> List[str]:
|
90 |
+
"""
|
91 |
+
Extract entities using fallback pattern matching
|
92 |
+
|
93 |
+
Args:
|
94 |
+
text (str): Input text
|
95 |
+
|
96 |
+
Returns:
|
97 |
+
List[str]: List of extracted entities
|
98 |
+
"""
|
99 |
+
entities = []
|
100 |
+
|
101 |
+
# Find common tech entities
|
102 |
+
for tech_entity in COMMON_TECH_ENTITIES:
|
103 |
+
if tech_entity.lower() in text.lower():
|
104 |
+
entities.append(tech_entity)
|
105 |
+
|
106 |
+
# Find capitalized words (likely proper nouns)
|
107 |
+
cap_words = re.findall(r'\b[A-Z][a-zA-Z]{2,}\b', text)
|
108 |
+
for word in cap_words:
|
109 |
+
if self._is_valid_entity(word) and word not in entities:
|
110 |
+
entities.append(word)
|
111 |
+
|
112 |
+
# Find words with specific patterns (e.g., Node.js, C++)
|
113 |
+
pattern_words = re.findall(r'\b[A-Z][a-zA-Z]*\.[a-zA-Z]+\b', text)
|
114 |
+
for word in pattern_words:
|
115 |
+
if word not in entities:
|
116 |
+
entities.append(word)
|
117 |
+
|
118 |
+
# Remove duplicates while preserving order
|
119 |
+
unique_entities = []
|
120 |
+
seen = set()
|
121 |
+
for entity in entities:
|
122 |
+
if entity.lower() not in seen:
|
123 |
+
seen.add(entity.lower())
|
124 |
+
unique_entities.append(entity)
|
125 |
+
|
126 |
+
logger.info(f"Fallback extracted {len(unique_entities)} entities")
|
127 |
+
return unique_entities[:MAX_ENTITIES]
|
128 |
+
|
129 |
+
def _is_valid_entity(self, entity: str) -> bool:
|
130 |
+
"""
|
131 |
+
Check if entity is valid for logo extraction
|
132 |
+
|
133 |
+
Args:
|
134 |
+
entity (str): Entity name
|
135 |
+
|
136 |
+
Returns:
|
137 |
+
bool: True if valid entity
|
138 |
+
"""
|
139 |
+
# Filter out common words that aren't brand names
|
140 |
+
invalid_words = {
|
141 |
+
'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with',
|
142 |
+
'by', 'from', 'up', 'about', 'into', 'through', 'during', 'before',
|
143 |
+
'after', 'above', 'below', 'between', 'among'}
|
144 |
+
# 'cloud', 'database',
|
145 |
+
# 'server', 'client', 'user', 'admin', 'data', 'system', 'network',
|
146 |
+
# 'security', 'management', 'development', 'application', 'platform',
|
147 |
+
# 'service', 'solution', 'technology', 'software', 'hardware', 'tool'
|
148 |
+
# }
|
149 |
+
|
150 |
+
entity_lower = entity.lower()
|
151 |
+
|
152 |
+
# Check length
|
153 |
+
if len(entity) < 2 or len(entity) > 50:
|
154 |
+
return False
|
155 |
+
|
156 |
+
# Check if it's a common invalid word
|
157 |
+
if entity_lower in invalid_words:
|
158 |
+
return False
|
159 |
+
|
160 |
+
# Must contain at least one letter
|
161 |
+
if not re.search(r'[a-zA-Z]', entity):
|
162 |
+
return False
|
163 |
+
|
164 |
+
return True
|
165 |
+
|
166 |
+
def extract_entities(self, text: str) -> List[str]:
|
167 |
+
"""
|
168 |
+
Extract entities from text using available methods
|
169 |
+
|
170 |
+
Args:
|
171 |
+
text (str): Input text
|
172 |
+
|
173 |
+
Returns:
|
174 |
+
List[str]: List of extracted entities
|
175 |
+
"""
|
176 |
+
if not text or not text.strip():
|
177 |
+
return []
|
178 |
+
|
179 |
+
logger.info("Starting entity extraction...")
|
180 |
+
|
181 |
+
# Try Gemini first
|
182 |
+
if self.model:
|
183 |
+
try:
|
184 |
+
entities = self.extract_with_gemini(text)
|
185 |
+
if entities:
|
186 |
+
logger.info(f"Successfully extracted {len(entities)} entities with Gemini")
|
187 |
+
return entities
|
188 |
+
except Exception as e:
|
189 |
+
logger.warning(f"Gemini extraction failed, using fallback: {e}")
|
190 |
+
|
191 |
+
# Use fallback method
|
192 |
+
entities = self.extract_with_fallback(text)
|
193 |
+
logger.info(f"Extracted {len(entities)} entities using fallback method")
|
194 |
+
|
195 |
+
return entities
|
services/image_downloader.py
ADDED
@@ -0,0 +1,278 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Image downloading module with multiple search providers
|
3 |
+
"""
|
4 |
+
import os
|
5 |
+
import json
|
6 |
+
import logging
|
7 |
+
from typing import List, Tuple
|
8 |
+
from urllib.parse import quote_plus, urlparse
|
9 |
+
import requests
|
10 |
+
from bs4 import BeautifulSoup
|
11 |
+
|
12 |
+
from services.appconfig import HEADERS, DOWNLOAD_TIMEOUT, REQUEST_DELAY, ALLOWED_EXTENSIONS
|
13 |
+
from utils.utils import is_valid_image_file, get_file_extension, clean_up_file, rate_limit_delay
|
14 |
+
|
15 |
+
logger = logging.getLogger(__name__)
|
16 |
+
|
17 |
+
|
18 |
+
class ImageDownloader:
|
19 |
+
"""Download images from various search providers"""
|
20 |
+
|
21 |
+
def __init__(self):
|
22 |
+
"""Initialize ImageDownloader"""
|
23 |
+
self.session = requests.Session()
|
24 |
+
self.session.headers.update(HEADERS)
|
25 |
+
|
26 |
+
def get_bing_image_urls(self, entity: str, num_images: int = 15) -> List[str]:
|
27 |
+
"""
|
28 |
+
Get image URLs from Bing search
|
29 |
+
|
30 |
+
Args:
|
31 |
+
entity (str): Entity name to search for
|
32 |
+
num_images (int): Maximum number of URLs to return
|
33 |
+
|
34 |
+
Returns:
|
35 |
+
List[str]: List of image URLs
|
36 |
+
"""
|
37 |
+
logger.info(f"Searching Bing for {entity} logos...")
|
38 |
+
|
39 |
+
query = f"{entity} logo png transparent high quality"
|
40 |
+
encoded_query = quote_plus(query)
|
41 |
+
search_url = f"https://www.bing.com/images/search?q={encoded_query}&form=HDRSC2&first=1&tsc=ImageBasicHover"
|
42 |
+
|
43 |
+
try:
|
44 |
+
response = self.session.get(search_url, timeout=DOWNLOAD_TIMEOUT, verify=False)
|
45 |
+
response.raise_for_status()
|
46 |
+
|
47 |
+
soup = BeautifulSoup(response.content, 'html.parser')
|
48 |
+
image_urls = []
|
49 |
+
|
50 |
+
# Find image data in Bing's format
|
51 |
+
img_containers = soup.find_all('a', {'class': 'iusc'})
|
52 |
+
for container in img_containers:
|
53 |
+
m_attr = container.get('m')
|
54 |
+
if m_attr:
|
55 |
+
try:
|
56 |
+
img_data = json.loads(m_attr)
|
57 |
+
img_url = img_data.get('murl') or img_data.get('turl')
|
58 |
+
if img_url and self._is_valid_image_url(img_url):
|
59 |
+
image_urls.append(img_url)
|
60 |
+
except json.JSONDecodeError:
|
61 |
+
continue
|
62 |
+
|
63 |
+
# Fallback: regular img tags
|
64 |
+
if len(image_urls) < 5:
|
65 |
+
img_tags = soup.find_all('img')
|
66 |
+
for img in img_tags:
|
67 |
+
src = img.get('src') or img.get('data-src')
|
68 |
+
if src and self._is_valid_image_url(src) and 'logo' in src.lower():
|
69 |
+
if src.startswith('http'):
|
70 |
+
image_urls.append(src)
|
71 |
+
|
72 |
+
logger.info(f"Found {len(image_urls)} URLs from Bing")
|
73 |
+
return image_urls[:num_images]
|
74 |
+
|
75 |
+
except Exception as e:
|
76 |
+
logger.error(f"Bing search failed for {entity}: {e}")
|
77 |
+
return []
|
78 |
+
|
79 |
+
def get_duckduckgo_image_urls(self, entity: str, num_images: int = 15) -> List[str]:
|
80 |
+
"""
|
81 |
+
Get image URLs from DuckDuckGo search
|
82 |
+
|
83 |
+
Args:
|
84 |
+
entity (str): Entity name to search for
|
85 |
+
num_images (int): Maximum number of URLs to return
|
86 |
+
|
87 |
+
Returns:
|
88 |
+
List[str]: List of image URLs
|
89 |
+
"""
|
90 |
+
logger.info(f"Searching DuckDuckGo for {entity} logos...")
|
91 |
+
|
92 |
+
query = f"{entity} logo hd png transparent"
|
93 |
+
encoded_query = quote_plus(query)
|
94 |
+
search_url = f"https://duckduckgo.com/?q={encoded_query}&t=h_&iax=images&ia=images"
|
95 |
+
|
96 |
+
try:
|
97 |
+
response = self.session.get(search_url, timeout=DOWNLOAD_TIMEOUT,verify=False)
|
98 |
+
response.raise_for_status()
|
99 |
+
|
100 |
+
soup = BeautifulSoup(response.content, 'html.parser')
|
101 |
+
image_urls = []
|
102 |
+
|
103 |
+
img_tags = soup.find_all('img')
|
104 |
+
for img in img_tags:
|
105 |
+
src = img.get('src') or img.get('data-src')
|
106 |
+
if src and self._is_valid_image_url(src) and src.startswith('http'):
|
107 |
+
image_urls.append(src)
|
108 |
+
|
109 |
+
logger.info(f"Found {len(image_urls)} URLs from DuckDuckGo")
|
110 |
+
return image_urls[:num_images]
|
111 |
+
|
112 |
+
except Exception as e:
|
113 |
+
logger.error(f"DuckDuckGo search failed for {entity}: {e}")
|
114 |
+
return []
|
115 |
+
|
116 |
+
def get_alternative_logo_sources(self, entity: str) -> List[str]:
|
117 |
+
"""
|
118 |
+
Get URLs from alternative logo sources
|
119 |
+
|
120 |
+
Args:
|
121 |
+
entity (str): Entity name
|
122 |
+
|
123 |
+
Returns:
|
124 |
+
List[str]: List of alternative logo URLs
|
125 |
+
"""
|
126 |
+
urls = []
|
127 |
+
entity_clean = entity.lower().replace(' ', '').replace('.', '')
|
128 |
+
entity_hyphen = entity.lower().replace(' ', '-')
|
129 |
+
|
130 |
+
# Try various logo services
|
131 |
+
logo_sources = [
|
132 |
+
f"https://cdn.worldvectorlogo.com/logos/{entity_hyphen}.svg",
|
133 |
+
f"https://logos-world.net/wp-content/uploads/2020/11/{entity.replace(' ', '-')}-Logo.png",
|
134 |
+
f"https://logoeps.com/wp-content/uploads/2013/03/vector-{entity_clean}-logo.png",
|
135 |
+
f"https://1000logos.net/wp-content/uploads/2016/10/{entity.replace(' ', '-')}-Logo.png",
|
136 |
+
]
|
137 |
+
|
138 |
+
for url in logo_sources:
|
139 |
+
try:
|
140 |
+
response = self.session.head(url, timeout=5)
|
141 |
+
if response.status_code == 200:
|
142 |
+
urls.append(url)
|
143 |
+
logger.info(f"Found alternative logo: {url}")
|
144 |
+
except Exception:
|
145 |
+
continue
|
146 |
+
|
147 |
+
return urls
|
148 |
+
|
149 |
+
def _is_valid_image_url(self, url: str) -> bool:
|
150 |
+
"""
|
151 |
+
Check if URL is a valid image URL
|
152 |
+
|
153 |
+
Args:
|
154 |
+
url (str): URL to check
|
155 |
+
|
156 |
+
Returns:
|
157 |
+
bool: True if valid image URL
|
158 |
+
"""
|
159 |
+
if not url:
|
160 |
+
return False
|
161 |
+
|
162 |
+
# Check if URL contains image extension
|
163 |
+
url_lower = url.lower()
|
164 |
+
return any(ext in url_lower for ext in ALLOWED_EXTENSIONS)
|
165 |
+
|
166 |
+
def download_image(self, url: str, filepath: str) -> bool:
|
167 |
+
"""
|
168 |
+
Download image from URL
|
169 |
+
|
170 |
+
Args:
|
171 |
+
url (str): Image URL
|
172 |
+
filepath (str): Local filepath to save image
|
173 |
+
|
174 |
+
Returns:
|
175 |
+
bool: True if download successful
|
176 |
+
"""
|
177 |
+
try:
|
178 |
+
logger.debug(f"Downloading: {url}")
|
179 |
+
|
180 |
+
response = self.session.get(url, timeout=DOWNLOAD_TIMEOUT, stream=True,verify=False)
|
181 |
+
response.raise_for_status()
|
182 |
+
|
183 |
+
# Check content type
|
184 |
+
content_type = response.headers.get('content-type', '').lower()
|
185 |
+
if not any(img_type in content_type for img_type in ['image', 'svg']):
|
186 |
+
logger.warning(f"Invalid content type for {url}: {content_type}")
|
187 |
+
return False
|
188 |
+
|
189 |
+
# Download with streaming
|
190 |
+
with open(filepath, 'wb') as f:
|
191 |
+
for chunk in response.iter_content(chunk_size=8192):
|
192 |
+
if chunk:
|
193 |
+
f.write(chunk)
|
194 |
+
|
195 |
+
# Validate downloaded file
|
196 |
+
if is_valid_image_file(filepath):
|
197 |
+
logger.debug(f"Successfully downloaded: {filepath}")
|
198 |
+
return True
|
199 |
+
else:
|
200 |
+
clean_up_file(filepath)
|
201 |
+
logger.warning(f"Downloaded invalid image: {url}")
|
202 |
+
return False
|
203 |
+
|
204 |
+
except Exception as e:
|
205 |
+
clean_up_file(filepath)
|
206 |
+
logger.error(f"Download failed for {url}: {e}")
|
207 |
+
return False
|
208 |
+
|
209 |
+
def download_logos_for_entity(self, entity: str, entity_folder: str, num_logos: int = 10) -> Tuple[int, List[str]]:
|
210 |
+
"""
|
211 |
+
Download logos for a single entity
|
212 |
+
|
213 |
+
Args:
|
214 |
+
entity (str): Entity name
|
215 |
+
entity_folder (str): Folder to save logos
|
216 |
+
num_logos (int): Number of logos to download
|
217 |
+
|
218 |
+
Returns:
|
219 |
+
Tuple[int, List[str]]: (number downloaded, list of downloaded files)
|
220 |
+
"""
|
221 |
+
logger.info(f"Downloading top {num_logos} logos for: {entity}")
|
222 |
+
|
223 |
+
# Collect URLs from all sources
|
224 |
+
all_urls = []
|
225 |
+
|
226 |
+
# Alternative logo services
|
227 |
+
alt_urls = self.get_alternative_logo_sources(entity)
|
228 |
+
all_urls.extend(alt_urls)
|
229 |
+
|
230 |
+
# Bing search
|
231 |
+
bing_urls = self.get_bing_image_urls(entity, 20)
|
232 |
+
all_urls.extend(bing_urls)
|
233 |
+
|
234 |
+
# DuckDuckGo search
|
235 |
+
ddg_urls = self.get_duckduckgo_image_urls(entity, 15)
|
236 |
+
all_urls.extend(ddg_urls)
|
237 |
+
|
238 |
+
# Remove duplicates while preserving order
|
239 |
+
unique_urls = []
|
240 |
+
seen = set()
|
241 |
+
for url in all_urls:
|
242 |
+
if url not in seen:
|
243 |
+
seen.add(url)
|
244 |
+
unique_urls.append(url)
|
245 |
+
|
246 |
+
if not unique_urls:
|
247 |
+
logger.warning(f"No URLs found for {entity}")
|
248 |
+
return 0, []
|
249 |
+
|
250 |
+
logger.info(f"Found {len(unique_urls)} unique URLs for {entity}")
|
251 |
+
|
252 |
+
# Download images
|
253 |
+
downloaded_files = []
|
254 |
+
downloaded_count = 0
|
255 |
+
|
256 |
+
for i, url in enumerate(unique_urls):
|
257 |
+
if downloaded_count >= num_logos:
|
258 |
+
break
|
259 |
+
|
260 |
+
try:
|
261 |
+
extension = get_file_extension(url)
|
262 |
+
filename = f"{entity.replace(' ', '_')}_logo_{downloaded_count + 1}{extension}"
|
263 |
+
filepath = os.path.join(entity_folder, filename)
|
264 |
+
|
265 |
+
if self.download_image(url, filepath):
|
266 |
+
downloaded_count += 1
|
267 |
+
downloaded_files.append(filepath)
|
268 |
+
logger.info(f"Downloaded ({downloaded_count}/{num_logos}): {filename}")
|
269 |
+
|
270 |
+
# Be respectful to servers
|
271 |
+
rate_limit_delay(REQUEST_DELAY)
|
272 |
+
|
273 |
+
except Exception as e:
|
274 |
+
logger.error(f"Error processing URL {url}: {e}")
|
275 |
+
continue
|
276 |
+
|
277 |
+
logger.info(f"Successfully downloaded {downloaded_count}/{num_logos} logos for {entity}")
|
278 |
+
return downloaded_count, downloaded_files
|
services/logo_downloader.py
ADDED
@@ -0,0 +1,228 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Main Logo Downloader class that orchestrates the entire process
|
3 |
+
"""
|
4 |
+
import os
|
5 |
+
import zipfile
|
6 |
+
import logging
|
7 |
+
from pathlib import Path
|
8 |
+
from typing import List, Tuple, Dict, Optional
|
9 |
+
|
10 |
+
from services.appconfig import DOWNLOADS_DIR, DEFAULT_LOGOS_PER_ENTITY
|
11 |
+
from utils.utils import create_safe_filename, create_directory, format_file_size
|
12 |
+
from .entity_extractor import EntityExtractor
|
13 |
+
from .image_downloader import ImageDownloader
|
14 |
+
|
15 |
+
logger = logging.getLogger(__name__)
|
16 |
+
|
17 |
+
class LogoDownloader:
|
18 |
+
"""Main class for downloading logos based on extracted entities"""
|
19 |
+
|
20 |
+
def __init__(self, gemini_api_key: str, output_dir: Optional[str] = None):
|
21 |
+
"""
|
22 |
+
Initialize LogoDownloader
|
23 |
+
|
24 |
+
Args:
|
25 |
+
gemini_api_key (str): Gemini API key for entity extraction
|
26 |
+
output_dir (str): Directory to save downloads
|
27 |
+
"""
|
28 |
+
self.output_dir = Path(output_dir) if output_dir else DOWNLOADS_DIR
|
29 |
+
self.entity_extractor = EntityExtractor(gemini_api_key)
|
30 |
+
self.image_downloader = ImageDownloader()
|
31 |
+
self.stats = {
|
32 |
+
'total_entities': 0,
|
33 |
+
'total_downloads': 0,
|
34 |
+
'successful_entities': 0,
|
35 |
+
'failed_entities': 0
|
36 |
+
}
|
37 |
+
|
38 |
+
# Create output directory
|
39 |
+
create_directory(self.output_dir)
|
40 |
+
|
41 |
+
def process_text(self, text: str, logos_per_entity: int = DEFAULT_LOGOS_PER_ENTITY) -> Dict:
|
42 |
+
"""
|
43 |
+
Main processing function: extract entities and download logos
|
44 |
+
|
45 |
+
Args:
|
46 |
+
text (str): Input text containing entity references
|
47 |
+
logos_per_entity (int): Number of logos to download per entity
|
48 |
+
|
49 |
+
Returns:
|
50 |
+
Dict: Processing results and statistics
|
51 |
+
"""
|
52 |
+
logger.info("Starting logo download process...")
|
53 |
+
|
54 |
+
# Reset stats
|
55 |
+
self._reset_stats()
|
56 |
+
|
57 |
+
# Extract entities
|
58 |
+
entities = self.entity_extractor.extract_entities(text)
|
59 |
+
|
60 |
+
if not entities:
|
61 |
+
logger.warning("No entities found in text")
|
62 |
+
return self._get_results("No entities found in the provided text")
|
63 |
+
|
64 |
+
self.stats['total_entities'] = len(entities)
|
65 |
+
logger.info(f"Found {len(entities)} entities: {', '.join(entities)}")
|
66 |
+
|
67 |
+
# Download logos for each entity
|
68 |
+
results = []
|
69 |
+
for i, entity in enumerate(entities, 1):
|
70 |
+
logger.info(f"Processing [{i}/{len(entities)}]: {entity}")
|
71 |
+
|
72 |
+
try:
|
73 |
+
result = self._process_single_entity(entity, logos_per_entity)
|
74 |
+
results.append(result)
|
75 |
+
|
76 |
+
if result['downloaded_count'] > 0:
|
77 |
+
self.stats['successful_entities'] += 1
|
78 |
+
self.stats['total_downloads'] += result['downloaded_count']
|
79 |
+
else:
|
80 |
+
self.stats['failed_entities'] += 1
|
81 |
+
|
82 |
+
except Exception as e:
|
83 |
+
logger.error(f"Failed to process entity {entity}: {e}")
|
84 |
+
self.stats['failed_entities'] += 1
|
85 |
+
results.append({
|
86 |
+
'entity': entity,
|
87 |
+
'downloaded_count': 0,
|
88 |
+
'files': [],
|
89 |
+
'error': str(e)
|
90 |
+
})
|
91 |
+
|
92 |
+
# Create zip package if we have downloads
|
93 |
+
zip_path = None
|
94 |
+
if self.stats['total_downloads'] > 0:
|
95 |
+
zip_path = self._create_zip_package()
|
96 |
+
|
97 |
+
return self._get_results(
|
98 |
+
"Processing completed successfully",
|
99 |
+
entities=entities,
|
100 |
+
results=results,
|
101 |
+
zip_path=zip_path
|
102 |
+
)
|
103 |
+
|
104 |
+
def _process_single_entity(self, entity: str, logos_per_entity: int) -> Dict:
|
105 |
+
|
106 |
+
"""
|
107 |
+
Process a single entity: create folder and download logos
|
108 |
+
|
109 |
+
Args:
|
110 |
+
entity (str): Entity name
|
111 |
+
logos_per_entity (int): Number of logos to download
|
112 |
+
|
113 |
+
Returns:
|
114 |
+
Dict: Processing result for this entity
|
115 |
+
"""
|
116 |
+
|
117 |
+
safe_name = create_safe_filename(entity)
|
118 |
+
entity_folder = self.output_dir / safe_name
|
119 |
+
|
120 |
+
# Create entity folder
|
121 |
+
if not create_directory(entity_folder):
|
122 |
+
raise Exception(f"Failed to create directory for {entity}")
|
123 |
+
|
124 |
+
# Download logos
|
125 |
+
downloaded_count, downloaded_files = self.image_downloader.download_logos_for_entity(
|
126 |
+
entity, str(entity_folder), logos_per_entity
|
127 |
+
)
|
128 |
+
|
129 |
+
return {
|
130 |
+
'entity': entity,
|
131 |
+
'safe_name': safe_name,
|
132 |
+
'downloaded_count': downloaded_count,
|
133 |
+
'files': downloaded_files,
|
134 |
+
'folder': str(entity_folder)
|
135 |
+
}
|
136 |
+
|
137 |
+
def _create_zip_package(self) -> str:
|
138 |
+
"""
|
139 |
+
Create ZIP package of all downloaded logos
|
140 |
+
|
141 |
+
Returns:
|
142 |
+
str: Path to created ZIP file
|
143 |
+
"""
|
144 |
+
zip_filename = f"{self.output_dir.name}_logos.zip"
|
145 |
+
zip_path = self.output_dir.parent / zip_filename
|
146 |
+
|
147 |
+
logger.info(f"Creating ZIP package: {zip_path}")
|
148 |
+
|
149 |
+
try:
|
150 |
+
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
151 |
+
for root, dirs, files in os.walk(self.output_dir):
|
152 |
+
for file in files:
|
153 |
+
file_path = os.path.join(root, file)
|
154 |
+
arcname = os.path.relpath(file_path, self.output_dir)
|
155 |
+
zipf.write(file_path, arcname)
|
156 |
+
|
157 |
+
file_size = os.path.getsize(zip_path)
|
158 |
+
logger.info(f"ZIP package created: {zip_path} ({format_file_size(file_size)})")
|
159 |
+
return str(zip_path)
|
160 |
+
|
161 |
+
except Exception as e:
|
162 |
+
logger.error(f"Failed to create ZIP package: {e}")
|
163 |
+
raise
|
164 |
+
|
165 |
+
def _reset_stats(self) -> None:
|
166 |
+
"""Reset processing statistics"""
|
167 |
+
self.stats = {
|
168 |
+
'total_entities': 0,
|
169 |
+
'total_downloads': 0,
|
170 |
+
'successful_entities': 0,
|
171 |
+
'failed_entities': 0
|
172 |
+
}
|
173 |
+
|
174 |
+
def _get_results(self, message: str, **kwargs) -> Dict:
|
175 |
+
"""
|
176 |
+
Get formatted results dictionary
|
177 |
+
|
178 |
+
Args:
|
179 |
+
message (str): Status message
|
180 |
+
**kwargs: Additional result data
|
181 |
+
|
182 |
+
Returns:
|
183 |
+
Dict: Formatted results
|
184 |
+
"""
|
185 |
+
return {
|
186 |
+
'status': 'success' if self.stats['total_downloads'] > 0 else 'warning',
|
187 |
+
'message': message,
|
188 |
+
'stats': self.stats.copy(),
|
189 |
+
**kwargs
|
190 |
+
}
|
191 |
+
|
192 |
+
def get_stats_summary(self) -> str:
|
193 |
+
"""
|
194 |
+
Get human-readable stats summary
|
195 |
+
|
196 |
+
Returns:
|
197 |
+
str: Stats summary
|
198 |
+
"""
|
199 |
+
if self.stats['total_entities'] == 0:
|
200 |
+
return "No entities processed"
|
201 |
+
|
202 |
+
avg_downloads = (
|
203 |
+
self.stats['total_downloads'] / self.stats['successful_entities']
|
204 |
+
if self.stats['successful_entities'] > 0 else 0
|
205 |
+
)
|
206 |
+
|
207 |
+
return (
|
208 |
+
f"Processed {self.stats['total_entities']} entities. "
|
209 |
+
f"Successfully downloaded {self.stats['total_downloads']} logos "
|
210 |
+
f"({avg_downloads:.1f} average per entity). "
|
211 |
+
f"Success rate: {self.stats['successful_entities']}/{self.stats['total_entities']}"
|
212 |
+
)
|
213 |
+
|
214 |
+
|
215 |
+
def download_logos(text: str, gemini_api_key: str, logos_per_entity: int = DEFAULT_LOGOS_PER_ENTITY) -> Dict:
|
216 |
+
"""
|
217 |
+
Convenience function for downloading logos
|
218 |
+
|
219 |
+
Args:
|
220 |
+
text (str): Text containing entity references
|
221 |
+
gemini_api_key (str): Gemini API key
|
222 |
+
logos_per_entity (int): Number of logos per entity
|
223 |
+
|
224 |
+
Returns:
|
225 |
+
Dict: Processing results
|
226 |
+
"""
|
227 |
+
downloader = LogoDownloader(gemini_api_key)
|
228 |
+
return downloader.process_text(text, logos_per_entity)
|
utils/__pycache__/utils.cpython-310.pyc
ADDED
Binary file (4.52 kB). View file
|
|
utils/utils.py
ADDED
@@ -0,0 +1,178 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Utility functions for the Logo Downloader application
|
3 |
+
"""
|
4 |
+
import os
|
5 |
+
import re
|
6 |
+
import json
|
7 |
+
import time
|
8 |
+
from pathlib import Path
|
9 |
+
from typing import List, Optional
|
10 |
+
from urllib.parse import urlparse
|
11 |
+
import logging
|
12 |
+
|
13 |
+
from services.appconfig import IMAGE_SIGNATURES, MIN_FILE_SIZE, MAX_FILE_SIZE
|
14 |
+
|
15 |
+
# Setup logging
|
16 |
+
logging.basicConfig(level=logging.INFO)
|
17 |
+
logger = logging.getLogger(__name__)
|
18 |
+
|
19 |
+
|
20 |
+
def create_safe_filename(name: str) -> str:
|
21 |
+
"""
|
22 |
+
Create a safe filename from entity name
|
23 |
+
|
24 |
+
Args:
|
25 |
+
name (str): Entity name
|
26 |
+
|
27 |
+
Returns:
|
28 |
+
str: Safe filename
|
29 |
+
"""
|
30 |
+
safe_name = re.sub(r'[^\w\s-]', '', name).strip()
|
31 |
+
safe_name = re.sub(r'[-\s]+', '_', safe_name)
|
32 |
+
return safe_name
|
33 |
+
|
34 |
+
|
35 |
+
def get_file_extension(url: str) -> str:
|
36 |
+
"""
|
37 |
+
Extract file extension from URL
|
38 |
+
|
39 |
+
Args:
|
40 |
+
url (str): Image URL
|
41 |
+
|
42 |
+
Returns:
|
43 |
+
str: File extension
|
44 |
+
"""
|
45 |
+
parsed_url = urlparse(url)
|
46 |
+
extension = os.path.splitext(parsed_url.path)[1]
|
47 |
+
|
48 |
+
if not extension or extension.lower() not in ['.png', '.jpg', '.jpeg', '.svg', '.webp']:
|
49 |
+
extension = '.png'
|
50 |
+
|
51 |
+
return extension
|
52 |
+
|
53 |
+
|
54 |
+
def is_valid_image_file(filepath: str) -> bool:
|
55 |
+
"""
|
56 |
+
Validate if file is a proper image
|
57 |
+
|
58 |
+
Args:
|
59 |
+
filepath (str): Path to image file
|
60 |
+
|
61 |
+
Returns:
|
62 |
+
bool: True if valid image
|
63 |
+
"""
|
64 |
+
try:
|
65 |
+
# Check file exists and size
|
66 |
+
if not os.path.exists(filepath):
|
67 |
+
return False
|
68 |
+
|
69 |
+
file_size = os.path.getsize(filepath)
|
70 |
+
if file_size < MIN_FILE_SIZE or file_size > MAX_FILE_SIZE:
|
71 |
+
logger.warning(f"Invalid file size: {file_size}")
|
72 |
+
return False
|
73 |
+
|
74 |
+
# Check image signature
|
75 |
+
with open(filepath, 'rb') as f:
|
76 |
+
header = f.read(12)
|
77 |
+
|
78 |
+
for signature in IMAGE_SIGNATURES:
|
79 |
+
if header.startswith(signature):
|
80 |
+
return True
|
81 |
+
|
82 |
+
return False
|
83 |
+
|
84 |
+
except Exception as e:
|
85 |
+
logger.error(f"Error validating image: {e}")
|
86 |
+
return False
|
87 |
+
|
88 |
+
|
89 |
+
def create_directory(path: Path) -> bool:
|
90 |
+
"""
|
91 |
+
Create directory if it doesn't exist
|
92 |
+
|
93 |
+
Args:
|
94 |
+
path (Path): Directory path
|
95 |
+
|
96 |
+
Returns:
|
97 |
+
bool: True if successful
|
98 |
+
"""
|
99 |
+
try:
|
100 |
+
path.mkdir(parents=True, exist_ok=True)
|
101 |
+
return True
|
102 |
+
except Exception as e:
|
103 |
+
logger.error(f"Error creating directory {path}: {e}")
|
104 |
+
return False
|
105 |
+
|
106 |
+
|
107 |
+
def clean_up_file(filepath: str) -> None:
|
108 |
+
"""
|
109 |
+
Remove file if it exists
|
110 |
+
|
111 |
+
Args:
|
112 |
+
filepath (str): Path to file to remove
|
113 |
+
"""
|
114 |
+
try:
|
115 |
+
if os.path.exists(filepath):
|
116 |
+
os.remove(filepath)
|
117 |
+
except Exception as e:
|
118 |
+
logger.error(f"Error removing file {filepath}: {e}")
|
119 |
+
|
120 |
+
|
121 |
+
def parse_json_safely(json_string: str) -> Optional[dict]:
|
122 |
+
"""
|
123 |
+
Safely parse JSON string
|
124 |
+
|
125 |
+
Args:
|
126 |
+
json_string (str): JSON string to parse
|
127 |
+
|
128 |
+
Returns:
|
129 |
+
dict or None: Parsed JSON or None if failed
|
130 |
+
"""
|
131 |
+
try:
|
132 |
+
return json.loads(json_string)
|
133 |
+
except json.JSONDecodeError:
|
134 |
+
return None
|
135 |
+
|
136 |
+
|
137 |
+
def rate_limit_delay(delay: float = 1.0) -> None:
|
138 |
+
"""
|
139 |
+
Add delay between requests to be respectful to servers
|
140 |
+
|
141 |
+
Args:
|
142 |
+
delay (float): Delay in seconds
|
143 |
+
"""
|
144 |
+
time.sleep(delay)
|
145 |
+
|
146 |
+
|
147 |
+
def format_file_size(size_bytes: int) -> str:
|
148 |
+
"""
|
149 |
+
Format file size in human readable format
|
150 |
+
|
151 |
+
Args:
|
152 |
+
size_bytes (int): Size in bytes
|
153 |
+
|
154 |
+
Returns:
|
155 |
+
str: Formatted size string
|
156 |
+
"""
|
157 |
+
if size_bytes < 1024:
|
158 |
+
return f"{size_bytes} B"
|
159 |
+
elif size_bytes < 1024 * 1024:
|
160 |
+
return f"{size_bytes / 1024:.1f} KB"
|
161 |
+
else:
|
162 |
+
return f"{size_bytes / (1024 * 1024):.1f} MB"
|
163 |
+
|
164 |
+
|
165 |
+
def truncate_text(text: str, max_length: int = 100) -> str:
|
166 |
+
"""
|
167 |
+
Truncate text to specified length
|
168 |
+
|
169 |
+
Args:
|
170 |
+
text (str): Text to truncate
|
171 |
+
max_length (int): Maximum length
|
172 |
+
|
173 |
+
Returns:
|
174 |
+
str: Truncated text
|
175 |
+
"""
|
176 |
+
if len(text) <= max_length:
|
177 |
+
return text
|
178 |
+
return text[:max_length - 3] + "..."
|