Spaces:
Running
Running
# -------- app_final.py -------- | |
import os, json, math, pathlib, re, time, logging, requests | |
from datetime import datetime, timedelta | |
import numpy as np | |
import pandas as pd | |
import plotly.graph_objects as go | |
import gradio as gr | |
import openai | |
import torch | |
from sentence_transformers import SentenceTransformer, util | |
# ββββββββββββββββββββββββββ 0. API keys & Brave Search ββββββββββββββββββββββββββ | |
if "OPENAI_API_KEY" not in os.environ: | |
os.environ["OPENAI_API_KEY"] = input("π Enter your OpenAI API key: ").strip() | |
openai.api_key = os.environ["OPENAI_API_KEY"] | |
BRAVE_KEY = os.getenv("BRAVE_KEY", "") | |
BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" | |
logging.basicConfig(level=logging.INFO) | |
# ββββββββββββββββββββββββββ 1. Cycle config ββββββββββββββββββββββββββ | |
CENTER = 2025 | |
CYCLES = { "K-Wave": 50, "Business": 9, "Finance": 80, "Hegemony": 250 } | |
ORDERED_PERIODS = sorted(CYCLES.values()) | |
COLOR = {9:"#66ff66", 50:"#ff3333", 80:"#ffcc00", 250:"#66ccff"} | |
AMPL = {9:0.6, 50:1.0, 80:1.6, 250:4.0} | |
PERIOD_BY_CYCLE = {k:v for k,v in CYCLES.items()} | |
# ββββββββββββββββββββββββββ 2. Load events JSON & embeddings βββββββββββββββββββ | |
EVENTS_PATH = pathlib.Path(__file__).with_name("cycle_events.json") | |
with open(EVENTS_PATH, encoding="utf-8") as f: | |
RAW_EVENTS = json.load(f) | |
EVENTS = {int(item["year"]): item["events"] for item in RAW_EVENTS} | |
logging.info("Embedding historical eventsβ¦") | |
_embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
_all_sentences = [(yr, ev["event_en"]) for yr, evs in EVENTS.items() for ev in evs] | |
_embeddings = _embed_model.encode([s for _, s in _all_sentences], convert_to_tensor=True) | |
# μ μ¬ μ¬κ±΄ top-3 year μ¬μ | |
SIMILAR_MAP = {} | |
for idx, (yr, _) in enumerate(_all_sentences): | |
scores = util.cos_sim(_embeddings[idx], _embeddings)[0] | |
top_idx = torch.topk(scores, 4).indices.tolist() | |
sims = [_all_sentences[i][0] for i in top_idx if _all_sentences[i][0] != yr][:3] | |
SIMILAR_MAP.setdefault(yr, sims) | |
# ββββββββββββββββββββββββββ 3. Brave Search helpers ββββββββββββββββββββββββββ | |
def brave_search(query: str, count: int = 8, freshness_days: int | None = None): | |
if not BRAVE_KEY: | |
return [] | |
params = {"q": query, "count": str(count)} | |
if freshness_days: | |
dt_from = (datetime.utcnow() - timedelta(days=freshness_days)).strftime("%Y-%m-%d") | |
params["freshness"] = dt_from | |
try: | |
r = requests.get( | |
BRAVE_ENDPOINT, | |
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_KEY}, | |
params=params, | |
timeout=15 | |
) | |
raw = r.json().get("web", {}).get("results") or [] | |
return [{ | |
"title": r.get("title", ""), | |
"url": r.get("url", r.get("link", "")), | |
"snippet": r.get("description", r.get("text", "")), | |
"host": re.sub(r"https?://(www\.)?", "", r.get("url", "")).split("/")[0] | |
} for r in raw[:count]] | |
except Exception as e: | |
logging.error(f"Brave error: {e}") | |
return [] | |
def format_search_results(query: str) -> str: | |
rows = brave_search(query, 6, freshness_days=3) | |
if not rows: | |
return f"# [Web-Search] No live results for β{query}β.\n" | |
hdr = f"# [Web-Search] Top results for β{query}β (last 3 days)\n\n" | |
body = "\n".join( | |
f"- **{r['title']}** ({r['host']})\n {r['snippet']}\n [link]({r['url']})" | |
for r in rows | |
) | |
return hdr + body + "\n" | |
NEWS_KEYWORDS = { | |
"Business": "recession OR GDP slowdown", | |
"K-Wave": "breakthrough technology innovation", | |
"Finance": "credit cycle debt crisis", | |
"Hegemony": "great power rivalry geopolitics" | |
} | |
def fetch_cycle_news(): | |
markers = [] | |
for cyc, kw in NEWS_KEYWORDS.items(): | |
res = brave_search(kw, 1, freshness_days=2) | |
if res: | |
markers.append({ | |
"cycle": cyc, | |
"title": res[0]["title"], | |
"year": datetime.utcnow().year, | |
"url": res[0]["url"] | |
}) | |
return markers | |
NEWS_MARKERS = fetch_cycle_news() | |
# ββββββββββββββββββββββββββ 4. Chart helpers ββββββββββββββββββββββββββ | |
def half_sine(xs, period, amp): | |
phase = np.mod(xs - CENTER, period) | |
y = amp * np.sin(np.pi * phase / period) | |
y[y < 0] = 0 | |
return y | |
def build_chart(start: int, end: int, lang: str = "KO"): | |
xs = np.linspace(start, end, max(1000, (end - start) * 4)) | |
fig = go.Figure() | |
# Gradient towers | |
for period in ORDERED_PERIODS: | |
base, col = AMPL[period], COLOR[period] | |
for frac in np.linspace(base / 30, base, 30): | |
fig.add_trace(go.Scatter( | |
x=xs, y=half_sine(xs, period, frac), | |
mode="lines", line=dict(color=col, width=0.8), | |
opacity=0.6, hoverinfo="skip", showlegend=False)) | |
fig.add_trace(go.Scatter( | |
x=xs, y=half_sine(xs, period, base), | |
mode="lines", line=dict(color=col, width=1.6), | |
hoverinfo="skip", showlegend=False)) | |
# Events + similar | |
text_key = "event_ko" if lang == "KO" else "event_en" | |
for yr, evs in EVENTS.items(): | |
if start <= yr <= end: | |
cyc = evs[0]["cycle"] | |
period = PERIOD_BY_CYCLE[cyc] | |
yv = float(half_sine(np.array([yr]), period, AMPL[period])) | |
sim = ", ".join(map(str, SIMILAR_MAP.get(yr, []))) or "None" | |
txt = "<br>".join(e[text_key] for e in evs) | |
fig.add_trace(go.Scatter( | |
x=[yr], y=[yv], mode="markers", | |
marker=dict(color="white", size=6), | |
customdata=[[cyc, txt, sim]], | |
hovertemplate=( | |
"Year %{x} β’ %{customdata[0]}<br>" | |
"%{customdata[1]}<br>" | |
"Similar: %{customdata[2]}<extra></extra>" | |
), | |
showlegend=False)) | |
# Live-news markers | |
for m in NEWS_MARKERS: | |
if start <= m["year"] <= end: | |
p = PERIOD_BY_CYCLE[m["cycle"]] | |
yv = float(half_sine(np.array([m["year"]]), p, AMPL[p])) * 1.05 | |
fig.add_trace(go.Scatter( | |
x=[m["year"]], y=[yv], mode="markers+text", | |
marker=dict(color="gold", size=8, symbol="star"), | |
text=["π°"], textposition="top center", | |
customdata=[[m["cycle"], m["title"], m["url"]]], | |
hovertemplate=("Live news β’ %{customdata[0]}<br>" | |
"%{customdata[1]}<extra></extra>"), | |
showlegend=False)) | |
# Hover Year trace | |
fig.add_trace(go.Scatter( | |
x=xs, y=np.full_like(xs, -0.05), | |
mode="lines", line=dict(color="rgba(0,0,0,0)", width=1), | |
hovertemplate="Year %{x:.0f}<extra></extra>", showlegend=False)) | |
# Cosmetics | |
fig.add_vline(x=CENTER, line_dash="dash", line_color="white", opacity=0.6) | |
arrow_y = AMPL[250] * 1.05 | |
fig.add_annotation( | |
x=CENTER - 125, y=arrow_y, ax=CENTER + 125, ay=arrow_y, | |
xref="x", yref="y", axref="x", ayref="y", | |
showarrow=True, arrowhead=3, arrowsize=1, | |
arrowwidth=1.2, arrowcolor="white") | |
fig.add_annotation( | |
x=CENTER, y=arrow_y + 0.15, text="250 yr", | |
showarrow=False, font=dict(color="white", size=10)) | |
fig.update_layout( | |
template="plotly_dark", | |
paper_bgcolor="black", plot_bgcolor="black", | |
height=500, margin=dict(t=30, l=40, r=40, b=40), | |
hoverlabel=dict(bgcolor="#222", font_size=11), | |
hovermode="x") | |
fig.update_xaxes(title="Year", range=[start, end], showgrid=False) | |
fig.update_yaxes(title="Relative amplitude", showticklabels=False, showgrid=False) | |
summary = f"Range {start}-{end} | Events: {sum(1 for y in EVENTS if start <= y <= end)}" | |
return fig, summary | |
# ββββββββββββββββββββββββββ 5. GPT helper ββββββββββββββββββββββββββ | |
BASE_PROMPT = ( | |
"λΉμ μ **CycleNavigator AI**λ‘, κ²½μ μ¬Β·κ΅μ μ μΉΒ·μ₯μ£ΌκΈ°(9y Business, 50y K-Wave, " | |
"80y Finance, 250y Hegemony) λΆμμ μ ν΅ν μ λ¬Έκ°μ λλ€. " | |
"λͺ¨λ λ΅λ³μ νκ΅μ΄λ‘ νλ νμ μ μ νμ±κ³Ό μ€λ¬΄μ λͺ λ£μ±μ λμμ κ°μΆμμμ€. " | |
"β¦ λ΅λ³ ꡬ쑰 μ§μΉ¨: β μ§λ¬Έ ν΅μ¬ μμ½ β β‘ 4λ μ£ΌκΈ°μμ κ΄λ ¨μ± λͺ μ β " | |
"β’ μμ¬Β·λ°μ΄ν° κ·Όκ±° μ€λͺ β β£ μμ¬μ Β·μ λ§ μμΌλ‘ μμ νλ©°, " | |
"λ²νΈβ§κΈλ¨Έλ¦¬νΒ·μ§§μ λ¬Έλ¨μ νμ©ν΄ λ Όλ¦¬μ μΌλ‘ λ°°μ΄ν©λλ€. " | |
"β¦ μ 곡λ [Chart summary]λ λ°λμ ν΄μΒ·μΈμ©νκ³ , " | |
"κ°κ΄μ μ¬μ€Β·μ°λΒ·μ¬κ±΄μ κ·Όκ±°λ‘ ν©λλ€. " | |
"β¦ κ·Όκ±°κ° λΆμΆ©λΆν λ βνμ€νμ§ μμ΅λλ€βλΌκ³ λͺ μν΄ μΆμΈ‘μ νΌνμμμ€. " | |
"β¦ λΆνμν μ₯ν©ν¨μ μΌκ°κ³ 3κ° λ¨λ½ λλ 7κ° μ΄ν bullets λ΄λ‘ μμ½νμμμ€." | |
) | |
def chat_with_gpt(hist, msg, chart_summary): | |
msgs = [{"role": "system", "content": BASE_PROMPT}] | |
if chart_summary not in ("", "No chart yet."): | |
msgs.append({"role": "system", "content": f"[Chart summary]\n{chart_summary}"}) | |
for u, a in hist: | |
msgs.extend([{"role": "user", "content": u}, {"role": "assistant", "content": a}]) | |
msgs.append({"role": "user", "content": msg}) | |
return openai.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=msgs, | |
max_tokens=600, | |
temperature=0.7 | |
).choices[0].message.content.strip() | |
# ββββββββββββββββββββββββββ 6. Gradio UI ββββββββββββββββββββββββββ | |
def create_app(): | |
with gr.Blocks( | |
theme=gr.themes.Soft(), | |
css=""" | |
#discord-badge{position:fixed; bottom:10px; left:50%; | |
transform:translateX(-50%);} | |
""" | |
) as demo: | |
gr.Markdown("## π **CycleNavigator (Interactive)**") | |
gr.Markdown( | |
"<sub>" | |
"<b>Interactive visual service delivering insights at a glance into the four major long cyclesβ </b>" | |
"<b>Business 9y</b> (credit-investment business cycle) β’ " | |
"<b>K-Wave 50y</b> (long technological-industrial wave) β’ " | |
"<b>Finance 80y</b> (long credit-debt cycle) β’ " | |
"<b>Hegemony 250y</b> (rise & fall of global powers cycle)" | |
"<b> βthrough dynamic charts and AI chat.</b>" | |
"</sub>" | |
) | |
# ββ μΈμ΄ μ ν βββββββββββββββββββββββββββββββββββββββββββ | |
# ββ μΈμ΄ μ ν βββββββββββββββββββββββββββββββββββββββββββ | |
lang_state = gr.State(value="EN") # β κΈ°λ³Έμ ENμΌλ‘ | |
lang_radio = gr.Radio( | |
["English", "νκ΅μ΄"], | |
value="English", | |
label="Language / μΈμ΄", | |
interactive=True | |
) | |
# μ΄κΈ° μ°¨νΈΒ·μν βββββββββββββββββββββββββββββββββββββββββ | |
chart_summary_state = gr.State(value="No chart yet.") | |
fig0, summ0 = build_chart(1500, 2500, lang_state.value) # β‘ λμΌ μνκ° μ¬μ© | |
plot = gr.Plot(value=fig0) | |
chart_summary_state.value = summ0 | |
with gr.Row(): | |
start_year = gr.Number(label="Start Year", value=1500, precision=0) | |
end_year = gr.Number(label="End Year", value=2500, precision=0) | |
zoom_in = gr.Button("π Zoom In") | |
zoom_out = gr.Button("π Zoom Out") | |
# ββ functions ββ | |
def refresh(s, e, lang_code): | |
fig, summ = build_chart(int(s), int(e), lang_code) | |
return fig, summ | |
def zoom(s, e, f, lang_code): | |
mid = (s + e) / 2 | |
span = (e - s) * f / 2 | |
ns, ne = int(mid - span), int(mid + span) | |
fig, summ = build_chart(ns, ne, lang_code) | |
return ns, ne, fig, summ | |
def change_lang(lang_label, s, e): | |
code = "KO" if lang_label == "νκ΅μ΄" else "EN" | |
fig, summ = build_chart(int(s), int(e), code) | |
return code, fig, summ | |
# ββ event wiring ββ | |
start_year.change(refresh, [start_year, end_year, lang_state], [plot, chart_summary_state]) | |
end_year.change(refresh, [start_year, end_year, lang_state], [plot, chart_summary_state]) | |
zoom_in.click( | |
lambda s, e, lc: zoom(s, e, 0.5, lc), | |
[start_year, end_year, lang_state], | |
[start_year, end_year, plot, chart_summary_state] | |
) | |
zoom_out.click( | |
lambda s, e, lc: zoom(s, e, 2.0, lc), | |
[start_year, end_year, lang_state], | |
[start_year, end_year, plot, chart_summary_state] | |
) | |
lang_radio.change( | |
change_lang, | |
[lang_radio, start_year, end_year], | |
[lang_state, plot, chart_summary_state] | |
) | |
gr.File(value=str(EVENTS_PATH), label="Download cycle_events.json") | |
with gr.Tabs(): | |
with gr.TabItem("Deep Research Chat"): | |
chatbot = gr.Chatbot(label="Assistant") | |
user_input = gr.Textbox(lines=3, placeholder="λ©μμ§λ₯Ό μ λ ₯νμΈμβ¦") | |
with gr.Row(): | |
send_btn = gr.Button("Send", variant="primary") | |
web_btn = gr.Button("π Web Search") | |
def respond(hist, msg, summ): | |
reply = chat_with_gpt(hist, msg, summ) | |
hist.append((msg, reply)) | |
return hist, gr.Textbox(value="") | |
def respond_ws(hist, msg, summ): | |
md = format_search_results(msg) | |
reply = chat_with_gpt(hist, f"{msg}\n\n{md}", summ) | |
hist.append((f"{msg}\n\n(μΉκ²μ)", reply)) | |
return hist, gr.Textbox(value="") | |
send_btn.click(respond, | |
[chatbot, user_input, chart_summary_state], | |
[chatbot, user_input]) | |
user_input.submit(respond, | |
[chatbot, user_input, chart_summary_state], | |
[chatbot, user_input]) | |
web_btn.click(respond_ws, | |
[chatbot, user_input, chart_summary_state], | |
[chatbot, user_input]) | |
# ββ fixed Discord badge ββ | |
gr.HTML( | |
'<a href="https://discord.gg/openfreeai" target="_blank" id="discord-badge">' | |
'<img src="https://img.shields.io/static/v1?label=Discord&message=Openfree%20AI&' | |
'color=%230000ff&labelColor=%23800080&logo=discord&logoColor=white&style=for-the-badge"' | |
' alt="badge"></a>' | |
) | |
return demo | |
# ββββββββββββββββββββββββββ main ββββββββββββββββββββββββββ | |
if __name__ == "__main__": | |
create_app().launch() | |