Spaces:
Sleeping
Sleeping
import os | |
import random | |
import time | |
import re | |
import json | |
import requests | |
from bs4 import BeautifulSoup | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
import openai | |
import gradio as gr | |
from fpdf import FPDF as FPDF2 | |
from datetime import datetime | |
from zoneinfo import ZoneInfo | |
import sys | |
# API ํค ์ค์ | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# OpenAI ์ค์ | |
openai.api_key = OPENAI_API_KEY | |
def setup_session(): | |
try: | |
session = requests.Session() | |
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) | |
session.mount('https://', HTTPAdapter(max_retries=retries)) | |
return session | |
except Exception as e: | |
return None | |
def generate_naver_search_url(query): | |
base_url = "https://search.naver.com/search.naver?" | |
params = {"ssc": "tab.blog.all", "sm": "tab_jum", "query": query} | |
url = base_url + "&".join(f"{key}={value}" for key, value in params.items()) | |
return url | |
def crawl_blog_content(url, session): | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", | |
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", | |
"Accept-Encoding": "gzip, deflate, br", | |
"Connection": "keep-alive", | |
"Referer": "https://search.naver.com/search.naver", | |
} | |
delay = random.uniform(1, 2) | |
time.sleep(delay) | |
response = session.get(url, headers=headers) | |
if response.status_code != 200: | |
return "" | |
soup = BeautifulSoup(response.content, "html.parser") | |
content = soup.find("div", attrs={'class': 'se-main-container'}) | |
if content: | |
return clean_text(content.get_text()) | |
else: | |
return "" | |
except Exception as e: | |
return "" | |
def crawl_naver_search_results(url, session): | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", | |
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", | |
"Accept-Encoding": "gzip, deflate, br", | |
"Connection": "keep-alive", | |
"Referer": "https://search.naver.com/search.naver", | |
} | |
response = session.get(url, headers=headers) | |
if response.status_code != 200: | |
return [] | |
soup = BeautifulSoup(response.content, "html.parser") | |
results = [] | |
count = 0 | |
for li in soup.find_all("li", class_=re.compile("bx.*")): | |
if count >= 10: | |
break | |
for div in li.find_all("div", class_="detail_box"): | |
for div2 in div.find_all("div", class_="title_area"): | |
title = div2.text.strip() | |
for a in div2.find_all("a", href=True): | |
link = a["href"] | |
if "blog.naver" in link: | |
link = link.replace("https://", "https://m.") | |
results.append({"์ ๋ชฉ": title, "๋งํฌ": link}) | |
count += 1 | |
if count >= 10: | |
break | |
if count >= 10: | |
break | |
if count >= 10: | |
break | |
return results | |
except Exception as e: | |
return [] | |
def clean_text(text): | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
def fetch_references(topic): | |
search_url = generate_naver_search_url(topic) | |
session = setup_session() | |
if session is None: | |
return ["์ธ์ ์ค์ ์คํจ"] * 3 | |
results = crawl_naver_search_results(search_url, session) | |
if len(results) < 3: | |
return ["์ถฉ๋ถํ ๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ์ฐพ์ง ๋ชปํ์ต๋๋ค."] * 3 | |
selected_results = random.sample(results, 3) | |
references = [] | |
for result in selected_results: | |
content = crawl_blog_content(result['๋งํฌ'], session) | |
references.append(f"์ ๋ชฉ: {result['์ ๋ชฉ']}\n๋ด์ฉ: {content}") | |
return references | |
def fetch_crawl_results(query): | |
references = fetch_references(query) | |
return references[0], references[1], references[2] | |
def generate_blog_post(query, prompt_template): | |
try: | |
# ๋ชฉํ ๊ธ์์ ์ค์ (๋ฌธ์ ์) | |
target_char_length = 2000 | |
max_attempts = 2 # ์ต๋ ์๋ ํ์ | |
# ์ฐธ๊ณ ๊ธ ๊ฐ์ ธ์ค๊ธฐ | |
references = fetch_references(query) | |
ref1, ref2, ref3 = references | |
# OpenAI API ์ค์ | |
model_name = "gpt-4o-mini" | |
temperature = 0.85 | |
max_tokens = 10000 # ๋ชจ๋ธ์ ์ต๋ ํ ํฐ ์์ ๋ง๊ฒ ์ค์ | |
top_p = 0.9 | |
frequency_penalty = 0.5 | |
presence_penalty = 0 | |
# ๋ถํ์ํ ํํ ๋ฆฌ์คํธ | |
unwanted_phrases = [ | |
'์ฌ๋ฌ๋ถ', | |
'์ต๊ทผ', | |
'๋ง์ง๋ง์ผ๋ก', | |
'๊ฒฐ๋ก ์ ์ผ๋ก', | |
'๊ฒฐ๊ตญ', | |
'์ข ํฉ์ ์ผ๋ก', | |
'๋ฐ๋ผ์', | |
'๋ง๋ฌด๋ฆฌ', | |
'์์ฝ' | |
] | |
# ๋ถํ์ํ ํํ์ด ํฌํจ๋ ๋จ์ด ์ ์ฒด๋ฅผ ์ ๊ฑฐํ๋ ํจ์ | |
def remove_unwanted_phrases(text): | |
# ๋จ์ด ๋จ์๋ก ํ ์คํธ๋ฅผ ๋ถํ | |
words = re.findall(r'\S+|\n', text) | |
result_words = [] | |
for word in words: | |
# ๋ถํ์ํ ํํ์ด ๋จ์ด์ ํฌํจ๋์ด ์๋์ง ํ์ธ | |
if not any(phrase in word for phrase in unwanted_phrases): | |
result_words.append(word) | |
# ๋จ์ด๋ค์ ๋ค์ ์กฐํฉํ์ฌ ๊ฒฐ๊ณผ ํ ์คํธ ์์ฑ | |
return ' '.join(result_words).replace(' \n ', '\n').replace(' \n', '\n').replace('\n ', '\n') | |
# ์ด๊ธฐ ํ๋กฌํํธ ๊ตฌ์ฑ | |
base_prompt = prompt_template + f""" | |
์ฃผ์ : {query} | |
์ฐธ๊ณ ๊ธ 1: {ref1} | |
์ฐธ๊ณ ๊ธ 2: {ref2} | |
์ฐธ๊ณ ๊ธ 3: {ref3} | |
๋ชฉํ ๊ธ์์: {target_char_length} | |
""" | |
for attempt in range(max_attempts): | |
# ์ฒซ ๋ฒ์งธ ์๋ ์ดํ์๋ ํ๋กฌํํธ์ ์ถ๊ฐ ์ง์นจ์ ํฌํจ | |
if attempt > 0: | |
additional_instructions = f""" | |
1. ์์ฑ๋ ๊ธ์ ๊ธธ์ด๊ฐ ๋ชฉํ ๊ธ์์์ ๋ฏธ๋ฌ์ ๋๋ค. ๋ด์ฉ์ ์ง์ ์ ์งํ๊ณ ๋์ฑ ํ๋ถํ๊ฒ ์์ฑํ๊ณ , ์ธ๋ถ์ฌํญ์ ์ถ๊ฐํ์ฌ ์์ฐ์ค๋ฝ๊ฒ ํ์ฅํ์ธ์. | |
2. ์ค๋ณต๋๋ ๋ด์ฉ์ ํผํ๊ณ ์๋ก์ด ์ ๋ณด๋ฅผ ์ถ๊ฐํ์ธ์. | |
3. ๋ฐ๋์ ๋งํฌ๋ค์ด ํ์์ด ์๋ ์์ํ ํ ์คํธ๋ก๋ง ์ถ๋ ฅํ์ธ์. | |
4. ๋ฐ๋์ ์ด ํํ๋ค์ ์ฌ์ฉํ์ง ๋ง์ธ์: ์ฌ๋ฌ๋ถ, ์ต๊ทผ, ๋ง์ง๋ง์ผ๋ก, ๊ฒฐ๋ก ์ ์ผ๋ก, ๊ฒฐ๊ตญ, ์ข ํฉ์ ์ผ๋ก, ๋ฐ๋ผ์, ๋ง๋ฌด๋ฆฌ, ์์ฝ. | |
""" | |
else: | |
additional_instructions = "" | |
user_prompt = base_prompt + additional_instructions | |
# ๋ฉ์์ง ๊ตฌ์ฑ | |
messages = [ | |
{"role": "user", "content": user_prompt} | |
] | |
# OpenAI API ํธ์ถ | |
response = openai.ChatCompletion.create( | |
model=model_name, | |
messages=messages, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
top_p=top_p, | |
frequency_penalty=frequency_penalty, | |
presence_penalty=presence_penalty, | |
) | |
# ๋ชจ๋ธ์ด ์์ฑํ ๋ต๋ณ ์ถ์ถ | |
generated_post = response['choices'][0]['message']['content'].strip() | |
# ๋ถํ์ํ ํํ ์ ๊ฑฐ ํจ์ ์ ์ฉ | |
generated_post = remove_unwanted_phrases(generated_post) | |
# ์์ฑ๋ ๊ธ์ ์ค์ ๋ฌธ์ ์ ๊ณ์ฐ | |
actual_char_length = len(generated_post) | |
# ๊ธ์์๊ฐ ๋ชฉํ๋ฅผ ์ถฉ์กฑํ๋ฉด ๋ฃจํ ์ข ๋ฃ | |
if actual_char_length >= target_char_length: | |
break | |
else: | |
# ๋ก๊ทธ์ ๊ฒฝ๊ณ ๋ฉ์์ง ์ถ๋ ฅ (์ฌ์ฉ์์๊ฒ๋ ํ์๋์ง ์์) | |
print(f"[์๋ {attempt + 1}] ์์ฑ๋ ๊ธ์ ๊ธธ์ด๊ฐ ๋ชฉํ ๊ธ์์์ ๋ฏธ๋ฌํฉ๋๋ค. (ํ์ฌ ๊ธ์์: {actual_char_length}์)") | |
if attempt == max_attempts - 1: | |
print("[๊ฒฝ๊ณ ] ์ต๋ ์๋ ํ์์ ๋๋ฌํ์ผ๋ ๋ชฉํ ๊ธ์์์ ๋๋ฌํ์ง ๋ชปํ์ต๋๋ค.") | |
# ์ต์ข ๊ฒฐ๊ณผ๋ฌผ ๊ตฌ์ฑ | |
final_post = f"์ฃผ์ : {query}\n\n{generated_post}" | |
return final_post, ref1, ref2, ref3, actual_char_length | |
except Exception as e: | |
print(f"generate_blog_post ํจ์์์ ์ค๋ฅ ๋ฐ์: {str(e)}") | |
return f"๋ธ๋ก๊ทธ ๊ธ ์์ฑ ์ค ์ค๋ฅ ๋ฐ์: {str(e)}", "", "", "", 0 | |
# PDF ํด๋์ค ๋ฐ ๊ด๋ จ ํจ์ ์ ์ | |
class PDF(FPDF2): | |
def __init__(self): | |
super().__init__() | |
current_dir = os.path.dirname(__file__) | |
self.add_font("NanumGothic", "", os.path.join(current_dir, "NanumGothic.ttf")) | |
self.add_font("NanumGothic", "B", os.path.join(current_dir, "NanumGothicBold.ttf")) | |
self.add_font("NanumGothicExtraBold", "", os.path.join(current_dir, "NanumGothicExtraBold.ttf")) | |
self.add_font("NanumGothicLight", "", os.path.join(current_dir, "NanumGothicLight.ttf")) | |
def header(self): | |
self.set_font('NanumGothic', '', 10) | |
def footer(self): | |
self.set_y(-15) | |
self.set_font('NanumGothic', '', 8) | |
self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C') | |
def save_to_pdf(blog_post, user_topic): | |
pdf = PDF() | |
pdf.add_page() | |
lines = blog_post.split('\n') | |
title = lines[0].strip() | |
content = '\n'.join(lines[1:]).strip() | |
# ํ์ฌ ๋ ์ง์ ์๊ฐ์ ๊ฐ์ ธ์ต๋๋ค (๋ํ๋ฏผ๊ตญ ์๊ฐ ๊ธฐ์ค) | |
now = datetime.now(ZoneInfo("Asia/Seoul")) | |
date_str = now.strftime("%y%m%d") | |
time_str = now.strftime("%H%M") | |
# ํ์ผ๋ช ์์ฑ | |
filename = f"{date_str}_{time_str}_{format_filename(user_topic)}.pdf" | |
pdf.set_font("NanumGothic", 'B', size=14) | |
pdf.cell(0, 10, title, ln=True, align='C') | |
pdf.ln(10) | |
pdf.set_font("NanumGothic", '', size=11) | |
pdf.multi_cell(0, 5, content) | |
print(f"Saving PDF as: {filename}") | |
pdf.output(filename) | |
return filename | |
def format_filename(text): | |
text = re.sub(r'[^\w\s-]', '', text) | |
return text[:50].strip() | |
def save_content_to_pdf(blog_post, user_topic): | |
return save_to_pdf(blog_post, user_topic) | |
# ๊ธฐ๋ณธ ํ๋กฌํํธ ํ ํ๋ฆฟ | |
DEFAULT_PROMPT_TEMPLATE = """ | |
[๋ธ๋ก๊ทธ ๊ธ ์์ฑ ๊ธฐ๋ณธ ๊ท์น] | |
1. ๋ฐ๋์ ํ๊ธ๋ก ์์ฑํ๋ผ | |
2. ์ฃผ์ด์ง ์ฐธ๊ณ ๊ธ์ ๋ฐํ์ผ๋ก 1๊ฐ์ ์ํ๋ฆฌ๋ทฐํ(Product Review) ๋ธ๋ก๊ทธ๋ฅผ ์์ฑ | |
3. ์ฃผ์ ์ ์ ๋ชฉ์ ์ ์ธํ ๊ธ์ด 1500๋จ์ด ์ด์์ด ๋๋๋ก ์์ฑ | |
4. ๊ธ์ ์ ๋ชฉ์ ์ํ๋ฆฌ๋ทฐํ ๋ธ๋ก๊ทธ ํํ์ ๋ง๋ ์ ์ ํ ์ ๋ชฉ์ผ๋ก ์ถ๋ ฅ | |
- ์ฐธ๊ณ ๊ธ์ ์ ๋ชฉ๋ ์ฐธ๊ณ ํ๋, ๋์ผํ๊ฒ ์์ฑํ์ง ๋ง ๊ฒ | |
5. ๋ฐ๋์ ๋งํฌ๋ค์ด ํ์์ด ์๋ ์์ํ ํ ์คํธ๋ก๋ง ์ถ๋ ฅํ๋ผ | |
6. ๋ค์ํ๋ฒ ์ฐธ๊ณ ๊ธ์ ๊ฒํ ํ์ฌ ๋ด์ฉ์ ์ถฉ๋ถํ ๋ฐ์ํ๋, ์ฐธ๊ณ ๊ธ์ ๊ธ์ ๊ทธ๋๋ก ์ฌ์์ฑํ์ง๋ ๋ง ๊ฒ | |
[๋ธ๋ก๊ทธ ๊ธ ์์ฑ ์ธ๋ถ ๊ท์น] | |
1. ์ฌ์ฉ์๊ฐ ์ ๋ ฅํ ์ฃผ์ ์ ์ฃผ์ด์ง ์ฐธ๊ณ ๊ธ 3๊ฐ๋ฅผ ๋ฐํ์ผ๋ก ์ํ๋ฆฌ๋ทฐํ ๋ธ๋ก๊ทธ ๊ธ 1๊ฐ๋ฅผ ์์ฑํ๋ผ | |
2. ์ฃผ์ด์ง ๋ชจ๋ ๊ธ์ ๋ถ์ํ์ฌ ํ๋์ ๋์ฃผ์ ๋ฅผ ์ ์ ํ๋ผ(1๊ฐ์ ์ฐธ๊ณ ๊ธ์ ์น์ฐ์น์ง ๋ง๊ณ ๋ค์ํ ๋ด์ฉ์ ๋ด์๊ฒ) | |
3. ์ฌ๋ฌ๊ฐ์ง ์ํ์ด๋ผ๋ฉด ์ํ 1๊ฐ์ ์น์ฐ์น ๋ฆฌ๋ทฐ๋ฅผ ์์ฑํ์ง ๋ง ๊ฒ. | |
4. ๋์ฃผ์ ์ ๋ง๊ฒ ๊ธ์ ๋งฅ๋ฝ์ ์ ์งํ๋ผ | |
5. ์ฐธ๊ณ ๊ธ์ ์์ฑ๋ ์ํ๊ณผ ๊ธฐ๋ฅ์ ์ง์คํ์ฌ ์์ฑํ๋ผ | |
6. ์ค์ ๋ด๊ฐ ์ฌ์ฉํด๋ณด๊ณ ๊ฒฝํํ ๋ด์ฉ์ ์์ฑํ ๋ฆฌ๋ทฐ ํํ๋ก ๊ธ์ ์์ฑ | |
7. ๋ด์ฉ์ ๊ธ์ ์ ์ผ๋ก ์์ฑํ๋, ์ํ์ด ๋๋ณด์ด๋๋ก ์์ฑ(์ ํ์ด ์ฌ๋ฌ๊ฐ์ผ ๊ฒฝ์ฐ, ํ๋์ ์ํ์ ์น์ฐ์น์ง ๋ง ๊ฒ) | |
8. ์ํ์ ๊ฐ์น๋ฅผ ๊ณ ๊ฐ์๊ฒ ์ดํํ๋ผ. | |
9. ๊ธ์ ์, ๋ค ๋ฌธ์ฅ์ด ์์ฐ์ค๋ฝ๊ฒ ์ด์ด์ง๋๋ก ์์ฑ | |
10. ์ดํฌ๋ ์ฃผ์ด์ง ์ฐธ๊ณ ๊ธ 3๊ฐ์ง์ ์ดํฌ๋ฅผ ์ ์ ํ ๋ฐ์ํ๋ผ | |
- ํนํ ๋ฌธ์ฅ์ ๋ ๋ถ๋ถ์ ์ ์ ํ ๋ฐ์(๊ฐ๊ธ์ '~์'๋ก ๋๋๋๋ก ์์ฑ) | |
- ๋๋ฌด ๋ฑ๋ฑํ์ง ์๊ฒ ํธ์ํ๊ฒ ์ฝ์ ์ ์๋๋ก ์์ฐ์ค๋ฌ์ด ๋ํ์ฒด๋ฅผ ๋ฐ์ | |
- ๋จ์ด ์ ํ์ ์ฌ์ด ํ๊ตญ์ด ์ดํ๋ฅผ ์ฌ์ฉํ๊ณ ์ฌ์ ์ํํ, ์ค๋๋ ํํ์ ์ ์ธํ๋ผ | |
[์ ์ธ ๊ท์น] | |
1. ๋ฐ๋์ ์ฐธ๊ณ ๊ธ์ ํฌํจ๋ ๋งํฌ(URL)๋ ์ ์ธ | |
2. ์ฐธ๊ณ ๊ธ์์ '๋งํฌ๋ฅผ ํ์ธํด์ฃผ์ธ์'์ ๊ฐ์ ๋งํฌ ์ด๋์ ๋ฌธ๊ตฌ๋ ์ ์ธ | |
3. ์ฐธ๊ณ ๊ธ์ ์๋ ์์ฑ์, ํ์, ์ ํ๋ฒ, ๊ธฐ์(Writer, speaker, YouTuber, reporter)์ ์ด๋ฆ, ์ ์นญ, ๋๋ค์(Name, Nkickname)์ ๋ฐ๋์ ์ ์ธ | |
4. '์ ์ฒด๋ก ๋ถํฐ ์ ๊ณต ๋ฐ์์ ์์ฑ', '์ฟ ํก ํํธ๋์ค'๋ฑ์ ํํ์ ๋ฐ๋์ ์ ์ธํ๋ผ. | |
5. ๊ธ์ ๊ตฌ์กฐ๊ฐ ๋๋ฌ๋๊ฒ ์์ฑํ์ง ๋ง ๊ฒ(์์, ๋์ ๋ํ ํํ) | |
- ์ฌ๋ฌ๋ถ, | |
- ๋ง์ง๋ง์ผ๋ก, ๊ฒฐ๋ก ์ ์ผ๋ก, ๊ฒฐ๊ตญ, ์ข ํฉ์ ์ผ๋ก, ๋ฐ๋ผ์, ๋ง๋ฌด๋ฆฌ, ์์ฝ, | |
""" | |
# Gradio ์ฑ ์์ฑ | |
with gr.Blocks() as iface: | |
gr.Markdown("# ๋ธ๋ก๊ทธ ๊ธ ์์ฑ๊ธฐ_๋ฆฌ๋ทฐ_๊ธฐ๋ฅ์ง์คํ") | |
gr.Markdown("์ฃผ์ ๋ฅผ ์ ๋ ฅํ๊ณ ๋ธ๋ก๊ทธ ๊ธ ์์ฑ ๋ฒํผ์ ๋๋ฅด๋ฉด ์๋์ผ๋ก ๋ธ๋ก๊ทธ ๊ธ์ ์์ฑํฉ๋๋ค.") | |
query_input = gr.Textbox(lines=1, placeholder="๋ธ๋ก๊ทธ ๊ธ์ ์ฃผ์ ๋ฅผ ์ ๋ ฅํด์ฃผ์ธ์...", label="์ฃผ์ ") | |
prompt_input = gr.Textbox(lines=10, value=DEFAULT_PROMPT_TEMPLATE, label="ํ๋กฌํํธ ํ ํ๋ฆฟ", visible=True) | |
generate_button = gr.Button("๋ธ๋ก๊ทธ ๊ธ ์์ฑ") | |
output_text = gr.Textbox(label="์์ฑ๋ ๋ธ๋ก๊ทธ ๊ธ") | |
ref1_text = gr.Textbox(label="์ฐธ๊ณ ๊ธ 1", lines=10, visible=True) | |
ref2_text = gr.Textbox(label="์ฐธ๊ณ ๊ธ 2", lines=10, visible=True) | |
ref3_text = gr.Textbox(label="์ฐธ๊ณ ๊ธ 3", lines=10, visible=True) | |
save_pdf_button = gr.Button("PDF๋ก ์ ์ฅ") | |
pdf_output = gr.File(label="์์ฑ๋ PDF ํ์ผ") | |
generate_button.click( | |
generate_blog_post, | |
inputs=[query_input, prompt_input], | |
outputs=[output_text, ref1_text, ref2_text, ref3_text], | |
show_progress=True | |
) | |
save_pdf_button.click( | |
save_content_to_pdf, | |
inputs=[output_text, query_input], | |
outputs=[pdf_output], | |
show_progress=True | |
) | |
# Gradio ์ฑ ์คํ | |
if __name__ == "__main__": | |
iface.launch() |