example-ai-crawler / src /streamlit_app.py
JUNGU's picture
Update src/streamlit_app.py
551eae9 verified
raw
history blame
15.4 kB
# app.py
import os
import streamlit as st
import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
import time
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from collections import Counter
import json
from datetime import datetime, timedelta
import openai
import schedule
import threading
import matplotlib.pyplot as plt
from wordcloud import WordCloud
# ─── μ„€μ •: μž„μ‹œ 디렉토리, NLTK 데이터 ─────────────────────────────────────────
# μž„μ‹œ 디렉토리 생성
TMP = "/tmp"
NLP_DATA = os.path.join(TMP, "nltk_data")
os.makedirs(NLP_DATA, exist_ok=True)
# NLTK 데이터 검색 κ²½λ‘œμ— μΆ”κ°€
nltk.data.path.insert(0, NLP_DATA)
# ν•„μš”ν•œ NLTK λ¦¬μ†ŒμŠ€ λ‹€μš΄λ‘œλ“œ
for pkg in ["punkt", "stopwords"]:
try:
nltk.data.find(f"tokenizers/{pkg}")
except LookupError:
nltk.download(pkg, download_dir=NLP_DATA)
# ─── OpenAI API ν‚€ 뢈러였기 ────────────────────────────────────────────────────
# μš°μ„  ν™˜κ²½ λ³€μˆ˜, κ·Έλ‹€μŒ st.secrets, λ§ˆμ§€λ§‰μœΌλ‘œ μ‚¬μ΄λ“œλ°” μž…λ ₯
OPENAI_KEY = os.getenv("OPENAI_API_KEY") or st.secrets.get("OPENAI_API_KEY")
if not OPENAI_KEY:
# μ•± μ‹€ν–‰ 쀑 μ‚¬μ΄λ“œλ°”μ—μ„œ μž…λ ₯ λ°›κΈ°
with st.sidebar:
st.markdown("### πŸ”‘ OpenAI API Key")
key_input = st.text_input("Enter your OpenAI API Key:", type="password")
if key_input:
OPENAI_KEY = key_input
if OPENAI_KEY:
openai.api_key = OPENAI_KEY
else:
st.sidebar.error("OpenAI API Keyκ°€ μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
# ─── Streamlit νŽ˜μ΄μ§€ & 메뉴 ꡬ성 ─────────────────────────────────────────────
st.set_page_config(page_title="πŸ“° News Tool", layout="wide")
with st.sidebar:
st.title("λ‰΄μŠ€ 기사 도ꡬ")
menu = st.radio("메뉴 선택", [
"λ‰΄μŠ€ 기사 크둀링", "기사 λΆ„μ„ν•˜κΈ°", "μƒˆ 기사 μƒμ„±ν•˜κΈ°", "λ‰΄μŠ€ 기사 μ˜ˆμ•½ν•˜κΈ°"
])
# ─── 파일 경둜 헬퍼 ──────────────────────────────────────────────────────────
def _tmp_path(*paths):
"""/tmp ν•˜μœ„ 경둜 μ‘°ν•©"""
full = os.path.join(TMP, *paths)
os.makedirs(os.path.dirname(full), exist_ok=True)
return full
# ─── μ €μž₯된 기사 λ‘œλ“œ/μ €μž₯ ───────────────────────────────────────────────────
def load_saved_articles():
path = _tmp_path("saved_articles", "articles.json")
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
return []
def save_articles(articles):
path = _tmp_path("saved_articles", "articles.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
# ─── 넀이버 λ‰΄μŠ€ 크둀러 ─────────────────────────────────────────────────────
@st.cache_data
def crawl_naver_news(keyword, num_articles=5):
url = f"https://search.naver.com/search.naver?where=news&query={keyword}"
results = []
try:
resp = requests.get(url, timeout=5)
soup = BeautifulSoup(resp.text, "html.parser")
items = soup.select("div.sds-comps-base-layout.sds-comps-full-layout")
for i, it in enumerate(items):
if i >= num_articles: break
title_el = it.select_one("a.X0fMYp2dHd0TCUS2hjww span")
link_el = it.select_one("a.X0fMYp2dHd0TCUS2hjww")
src_el = it.select_one("div.sds-comps-profile-info-title span")
date_el = it.select_one("span.r0VOr")
desc_el = it.select_one("a.X0fMYp2dHd0TCUS2hjww.IaKmSOGPdofdPwPE6cyU > span")
if not title_el or not link_el: continue
results.append({
"title": title_el.text.strip(),
"link": link_el["href"],
"source": src_el.text.strip() if src_el else "μ•Œ 수 μ—†μŒ",
"date": date_el.text.strip() if date_el else "μ•Œ 수 μ—†μŒ",
"description": desc_el.text.strip() if desc_el else "",
"content": ""
})
except Exception as e:
st.error(f"크둀링 였λ₯˜: {e}")
return results
# ─── 기사 λ³Έλ¬Έ κ°€μ Έμ˜€κΈ° ───────────────────────────────────────────────────────
def get_article_content(url):
try:
resp = requests.get(url, timeout=5)
soup = BeautifulSoup(resp.text, "html.parser")
cont = soup.select_one("#dic_area") or soup.select_one(".article_body, .news-content-inner")
if cont:
text = re.sub(r"\s+", " ", cont.text.strip())
return text
except Exception:
pass
return "본문을 κ°€μ Έμ˜¬ 수 μ—†μŠ΅λ‹ˆλ‹€."
# ─── ν‚€μ›Œλ“œ 뢄석 & μ›Œλ“œν΄λΌμš°λ“œ ───────────────────────────────────────────────
def analyze_keywords(text, top_n=10):
stop_kr = ["이","κ·Έ","μ €","것","및","λ“±","λ₯Ό","을","에","μ—μ„œ","의","으둜","둜"]
tokens = [w for w in word_tokenize(text) if w.isalnum() and len(w)>1 and w not in stop_kr]
freq = Counter(tokens)
return freq.most_common(top_n)
def extract_for_wordcloud(text, top_n=50):
tokens = [w for w in word_tokenize(text.lower()) if w.isalnum()]
stop_en = set(stopwords.words("english"))
korea_sw = {"및","λ“±","λ₯Ό","이","의","κ°€","에","λŠ”"}
sw = stop_en.union(korea_sw)
filtered = [w for w in tokens if w not in sw and len(w)>1]
freq = Counter(filtered)
return dict(freq.most_common(top_n))
def generate_wordcloud(freq_dict):
try:
wc = WordCloud(width=800, height=400, background_color="white")\
.generate_from_frequencies(freq_dict)
return wc
except Exception as e:
st.error(f"μ›Œλ“œν΄λΌμš°λ“œ 생성 였λ₯˜: {e}")
return None
# ─── OpenAI 기반 μƒˆ 기사 & 이미지 생성 ───────────────────────────────────────
def generate_article(orig, prompt_text):
if not openai.api_key:
return "API Keyκ°€ μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."
try:
resp = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role":"system","content":"당신은 μ „λ¬Έ λ‰΄μŠ€ κΈ°μžμž…λ‹ˆλ‹€."},
{"role":"user", "content":f"{prompt_text}\n\n{orig[:1000]}"}
],
max_tokens=1500
)
return resp.choices[0].message["content"]
except Exception as e:
return f"기사 생성 였λ₯˜: {e}"
def generate_image(prompt):
if not openai.api_key:
return None
try:
resp = openai.Image.create(prompt=prompt, n=1, size="512x512")
return resp["data"][0]["url"]
except Exception as e:
st.error(f"이미지 생성 였λ₯˜: {e}")
return None
# ─── μŠ€μΌ€μ€„λŸ¬ μƒνƒœ 클래슀 ───────────────────────────────────────────────────
class SchedulerState:
def __init__(self):
self.is_running = False
self.thread = None
self.last_run = None
self.next_run = None
self.jobs = []
self.results = []
global_scheduler = SchedulerState()
def perform_news_task(task_type, kw, n, prefix):
arts = crawl_naver_news(kw, n)
for a in arts:
a["content"] = get_article_content(a["link"])
time.sleep(0.5)
fname = _tmp_path("scheduled_news", f"{prefix}_{task_type}_{datetime.now():%Y%m%d_%H%M%S}.json")
with open(fname,"w",encoding="utf-8") as f:
json.dump(arts, f, ensure_ascii=False, indent=2)
global_scheduler.last_run = datetime.now()
global_scheduler.results.append({
"type":task_type, "keyword":kw,
"count":len(arts), "file":fname,
"timestamp":global_scheduler.last_run
})
def run_scheduler():
while global_scheduler.is_running:
schedule.run_pending()
time.sleep(1)
def start_scheduler(daily, interval):
if global_scheduler.is_running: return
schedule.clear(); global_scheduler.jobs=[]
# 일별
for t in daily:
hh, mm = t["hour"], t["minute"]
tag = f"d_{t['keyword']}_{hh}{mm}"
schedule.every().day.at(f"{hh:02d}:{mm:02d}")\
.do(perform_news_task,"daily",t["keyword"],t["num_articles"],tag).tag(tag)
global_scheduler.jobs.append(tag)
# 간격
for t in interval:
tag = f"i_{t['keyword']}_{t['interval']}"
if t["immediate"]:
perform_news_task("interval", t["keyword"], t["num_articles"], tag)
schedule.every(t["interval"]).minutes\
.do(perform_news_task,"interval",t["keyword"],t["num_articles"],tag).tag(tag)
global_scheduler.jobs.append(tag)
global_scheduler.next_run = schedule.next_run()
global_scheduler.is_running = True
th = threading.Thread(target=run_scheduler, daemon=True)
th.start(); global_scheduler.thread = th
def stop_scheduler():
global_scheduler.is_running = False
schedule.clear()
global_scheduler.jobs=[]
# ─── ν™”λ©΄ 그리기: 메뉴별 κΈ°λŠ₯ ────────────────────────────────────────────────
if menu == "λ‰΄μŠ€ 기사 크둀링":
st.header("λ‰΄μŠ€ 기사 크둀링")
kw = st.text_input("πŸ” 검색어", "인곡지λŠ₯")
num = st.slider("κ°€μ Έμ˜¬ 기사 수", 1, 20, 5)
if st.button("기사 κ°€μ Έμ˜€κΈ°"):
arts = crawl_naver_news(kw, num)
for i,a in enumerate(arts):
st.progress((i+1)/len(arts))
a["content"] = get_article_content(a["link"])
time.sleep(0.3)
save_articles(arts)
st.success(f"{len(arts)}개 기사 μ €μž₯됨")
for a in arts:
with st.expander(a["title"]):
st.write(f"좜처: {a['source']} | λ‚ μ§œ: {a['date']}")
st.write(a["description"])
st.write(a["content"][:300]+"…")
elif menu == "기사 λΆ„μ„ν•˜κΈ°":
st.header("기사 λΆ„μ„ν•˜κΈ°")
arts = load_saved_articles()
if not arts:
st.warning("λ¨Όμ € β€˜λ‰΄μŠ€ 기사 크둀링’ λ©”λ‰΄μ—μ„œ 기사λ₯Ό μˆ˜μ§‘ν•˜μ„Έμš”.")
else:
titles = [a["title"] for a in arts]
sel = st.selectbox("뢄석할 기사 선택", titles)
art = next(a for a in arts if a["title"]==sel)
st.subheader(art["title"])
with st.expander("본문 보기"):
st.write(art["content"])
mode = st.radio("뢄석 방식", ["ν‚€μ›Œλ“œ 뢄석", "ν…μŠ€νŠΈ 톡계"])
if mode=="ν‚€μ›Œλ“œ 뢄석" and st.button("μ‹€ν–‰"):
kw_list = analyze_keywords(art["content"])
df = pd.DataFrame(kw_list, columns=["단어","λΉˆλ„"])
st.bar_chart(df.set_index("단어"))
st.write("μƒμœ„ ν‚€μ›Œλ“œ:")
for w,c in kw_list: st.write(f"- {w}: {c}")
# μ›Œλ“œν΄λΌμš°λ“œ
wc_data = extract_for_wordcloud(art["content"])
wc = generate_wordcloud(wc_data)
if wc:
fig,ax = plt.subplots(figsize=(8,4))
ax.imshow(wc,interp="bilinear"); ax.axis("off")
st.pyplot(fig)
if mode=="ν…μŠ€νŠΈ 톡계" and st.button("μ‹€ν–‰"):
txt=art["content"]
wcnt=len(re.findall(r"\\w+",txt))
scnt=len(re.split(r"[.!?]+",txt))
st.metric("단어 수",wcnt); st.metric("λ¬Έμž₯ 수",scnt)
elif menu == "μƒˆ 기사 μƒμ„±ν•˜κΈ°":
st.header("μƒˆ 기사 μƒμ„±ν•˜κΈ°")
arts = load_saved_articles()
if not arts:
st.warning("λ¨Όμ € 기사λ₯Ό μˆ˜μ§‘ν•΄μ£Όμ„Έμš”.")
else:
sel = st.selectbox("원본 기사 선택", [a["title"] for a in arts])
art = next(a for a in arts if a["title"]==sel)
st.write(art["content"][:200]+"…")
prompt = st.text_area("기사 μž‘μ„± μ§€μΉ¨", "기사 ν˜•μ‹μ— 맞좰 μƒˆλ‘œ μž‘μ„±ν•΄ μ£Όμ„Έμš”.")
gen_img = st.checkbox("이미지도 생성", value=True)
if st.button("생성"):
new = generate_article(art["content"], prompt)
st.subheader("μƒμ„±λœ 기사")
st.write(new)
if gen_img:
url = generate_image(f"기사 제λͺ©: {art['title']}\n\n{prompt}")
if url: st.image(url)
elif menu == "λ‰΄μŠ€ 기사 μ˜ˆμ•½ν•˜κΈ°":
st.header("λ‰΄μŠ€ 기사 μ˜ˆμ•½ν•˜κΈ°")
tab1,tab2,tab3 = st.tabs(["일별 μ˜ˆμ•½","간격 μ˜ˆμ•½","μƒνƒœ"])
# 일별
with tab1:
dkw = st.text_input("ν‚€μ›Œλ“œ(일별)", "인곡지λŠ₯", key="dk")
dnum = st.number_input("기사 수",1,20,5,key="dn")
dhh = st.number_input("μ‹œ",0,23,9,key="dh")
dmm = st.number_input("λΆ„",0,59,0,key="dm")
if st.button("μΆ”κ°€",key="addd"):
st.session_state.setdefault("daily",[]).append({
"keyword":dkw,"num_articles":dnum,
"hour":dhh,"minute":dmm
})
if st.session_state.get("daily"):
st.write(st.session_state["daily"])
# 간격
with tab2:
ikw = st.text_input("ν‚€μ›Œλ“œ(간격)", "빅데이터", key="ik")
inum = st.number_input("기사 수",1,20,5,key="in")
inter= st.number_input("간격(λΆ„)",1,1440,60,key="ii")
imm = st.checkbox("μ¦‰μ‹œ μ‹€ν–‰",True,key="im")
if st.button("μΆ”κ°€",key="addi"):
st.session_state.setdefault("interval",[]).append({
"keyword":ikw,"num_articles":inum,
"interval":inter,"immediate":imm
})
if st.session_state.get("interval"):
st.write(st.session_state["interval"])
# μƒνƒœ
with tab3:
if not global_scheduler.is_running and st.button("μ‹œμž‘"):
start_scheduler(st.session_state.get("daily",[]),
st.session_state.get("interval",[]))
if global_scheduler.is_running and st.button("쀑지"):
stop_scheduler()
st.write("싀행쀑:", global_scheduler.is_running)
st.write("λ§ˆμ§€λ§‰ μ‹€ν–‰:", global_scheduler.last_run)
st.write("λ‹€μŒ μ‹€ν–‰:", global_scheduler.next_run)
st.write("작 수:", global_scheduler.jobs)
st.dataframe(pd.DataFrame(global_scheduler.results))
# ─── ν‘Έν„° ────────────────────────────────────────────────────────────────────
st.markdown("---")
st.markdown("Β© 2025 News Tool @conanssam")