|
|
|
|
|
from fastapi import FastAPI, Query |
|
from fastapi.responses import Response |
|
import requests |
|
import xml.etree.ElementTree as ET |
|
from datetime import datetime |
|
from typing import Optional |
|
|
|
app = FastAPI() |
|
|
|
|
|
from bs4 import BeautifulSoup |
|
from urllib.parse import quote |
|
|
|
def html_to_fb2(title: str, body: str) -> str: |
|
clean_text = BeautifulSoup(body, "html.parser").get_text(separator="\n") |
|
return f"""<?xml version='1.0' encoding='utf-8'?> |
|
<FictionBook xmlns:xlink='http://www.w3.org/1999/xlink'> |
|
<description> |
|
<title-info> |
|
<genre>nonfiction</genre> |
|
<author><first-name>OPDS</first-name><last-name>DuckScraper</last-name></author> |
|
<book-title>{title}</book-title> |
|
<lang>en</lang> |
|
</title-info> |
|
</description> |
|
<body> |
|
<section> |
|
<title><p>{title}</p></title> |
|
<p>{clean_text}</p> |
|
</section> |
|
</body> |
|
</FictionBook>""" |
|
|
|
|
|
def duckduckgo_search(query: str): |
|
api_url = "https://api.duckduckgo.com/" |
|
params = { |
|
"q": query, |
|
"format": "json", |
|
"no_html": 1, |
|
"skip_disambig": 1 |
|
} |
|
res = requests.get(api_url, params=params, headers={"User-Agent": "Mozilla/5.0"}, timeout=10) |
|
res.raise_for_status() |
|
data = res.json() |
|
results = [] |
|
def extract_topics(topics): |
|
for item in topics: |
|
if "FirstURL" in item and "Text" in item: |
|
results.append((item["Text"], item["FirstURL"])) |
|
elif "Topics" in item: |
|
extract_topics(item["Topics"]) |
|
extract_topics(data.get("RelatedTopics", [])) |
|
return results[:10] |
|
|
|
|
|
def create_feed(entries: list, q: Optional[str]) -> bytes: |
|
ns = "http://www.w3.org/2005/Atom" |
|
ET.register_namespace("", ns) |
|
feed = ET.Element("feed", xmlns=ns) |
|
ET.SubElement(feed, "id").text = "urn:uuid:duckopds-catalog" |
|
ET.SubElement(feed, "title").text = "DuckDuckGo OPDS Catalog" |
|
ET.SubElement(feed, "updated").text = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
|
|
|
ET.SubElement(feed, "link", { |
|
"rel": "search", |
|
"type": "application/atom+xml;profile=opds-catalog;kind=search", |
|
"href": "/opds?q={searchTerms}", |
|
"templated": "true" |
|
}) |
|
|
|
|
|
for entry_info in entries: |
|
entry = ET.SubElement(feed, "entry") |
|
ET.SubElement(entry, "id").text = entry_info['id'] |
|
ET.SubElement(entry, "title").text = entry_info['title'] |
|
ET.SubElement(entry, "updated").text = entry_info['updated'] |
|
ET.SubElement(entry, "link", entry_info['link']) |
|
|
|
return ET.tostring(feed, encoding="utf-8", xml_declaration=True) |
|
|
|
|
|
@app.get("/opds") |
|
def opds(q: Optional[str] = Query(None, description="Search query")) -> Response: |
|
entries = [] |
|
kind = "search" |
|
if q: |
|
results = duckduckgo_search(q) |
|
for title, url in results: |
|
entries.append({ |
|
'id': url, |
|
'title': title, |
|
'updated': datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), |
|
'link': { |
|
'rel': 'http://opds-spec.org/acquisition', |
|
'href': f"/download?url={quote(url, safe='')}", |
|
'type': 'application/fb2+xml' |
|
} |
|
}) |
|
kind = "acquisition" |
|
xml_data = create_feed(entries, q) |
|
return Response(content=xml_data, |
|
media_type=f"application/atom+xml;profile=opds-catalog;kind={kind}") |
|
|
|
@app.get("/download") |
|
def download_fb2(url: str) -> Response: |
|
res = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10) |
|
res.raise_for_status() |
|
from bs4 import BeautifulSoup |
|
soup = BeautifulSoup(res.text, "html.parser") |
|
title = soup.title.string.strip() if soup.title and soup.title.string else "article" |
|
fb2 = html_to_fb2(title, str(soup.body)) |
|
filename = f"{quote(title, safe='').replace('%20','_')[:30]}.fb2" |
|
return Response( |
|
content=fb2, |
|
media_type="application/fb2+xml", |
|
headers={"Content-Disposition": f"attachment; filename={filename}"} |
|
) |
|
|
|
|