Spaces:
Running
Running
File size: 6,801 Bytes
8289369 |
|
import requests
import feedparser
# from dataclasses import dataclass, field # 已移除
from typing import Optional # , List, Dict # List 和 Dict 不再需要
from datetime import datetime
import time
from ..schemas import PodcastEpisode, PodcastChannel
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
if not date_str:
return None
try:
# feedparser 已经将日期解析为 time.struct_time 类型
# 我们将其转换为 datetime 类型
if isinstance(date_str, time.struct_time):
return datetime.fromtimestamp(time.mktime(date_str))
# 如果 feedparser 解析失败或返回字符串,则回退使用其他字符串格式解析
# 这是一种常见的 RSS 日期格式
return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z')
except (ValueError, TypeError):
try:
return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %Z') # 处理 GMT, EST 等时区
except (ValueError, TypeError):
# 如果时区缺失或无法解析,则尝试不带时区解析
try:
return datetime.strptime(date_str[:-6], '%a, %d %b %Y %H:%M:%S')
except (ValueError, TypeError):
print(f"Warning: Could not parse date string: {date_str}")
return None
def fetch_rss_content(rss_url: str) -> Optional[bytes]:
"""
通过 HTTP 请求获取 RSS feed 的内容。
参数:
rss_url: 播客 RSS feed 的 URL。
返回:
bytes 类型的 RSS 内容,如果获取失败则返回 None。
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
}
try:
response = requests.get(rss_url, headers=headers, timeout=30)
response.raise_for_status() # 针对 HTTP 错误抛出异常
return response.content
except requests.exceptions.RequestException as e:
print(f"获取 RSS feed 时出错: {e}")
return None
def parse_rss_xml_content(rss_content: bytes) -> Optional[PodcastChannel]:
"""
解析播客 RSS XML 内容,并返回其主要信息和剧集详情。
参数:
rss_content: bytes 类型的 RSS XML 内容。
返回:
一个包含已解析信息的 PodcastChannel 对象,如果解析失败则返回 None。
"""
feed = feedparser.parse(rss_content)
if feed.bozo:
# 如果 feed 格式不正确,bozo 为 True
# feed.bozo_exception 包含异常信息
print(f"警告: RSS feed 可能格式不正确。Bozo 异常: {feed.bozo_exception}")
# 即使格式不完全正确,feedparser 通常仍会尝试解析,所以我们不在此处直接返回 None
# 但如果关键的 feed 或 channel_info 缺失,后续会自然失败
channel_info = feed.get('feed', {})
if not channel_info: # 如果连基本的 feed 结构都没有,则认为解析失败
print("错误: RSS 内容无法解析为有效的 feed 结构。")
return None
podcast_channel = PodcastChannel(
title=channel_info.get('title'),
link=channel_info.get('link'),
description=channel_info.get('subtitle') or channel_info.get('description'),
language=channel_info.get('language'),
image_url=channel_info.get('image', {}).get('href') if channel_info.get('image') else None,
author=channel_info.get('author') or channel_info.get('itunes_author'),
last_build_date=_parse_date(channel_info.get('updated_parsed') or channel_info.get('published_parsed'))
)
for entry in feed.entries:
# 确定 shownotes:优先使用 content:encoded,然后是 itunes:summary,其次是 description/summary
shownotes = None
# 1. 优先尝试 <content:encoded>
# entry.content 是一个 FeedParserDict 对象列表
if 'content' in entry and entry.content:
for content_item in entry.content:
# 检查 content_item 是否有 value 属性并且该值非空
if hasattr(content_item, 'value') and content_item.value:
shownotes = content_item.value
break # 找到第一个有效的 content:encoded,停止查找
# 2. 如果没有从 content:encoded 获得,尝试 itunes:summary
if not shownotes and 'itunes_summary' in entry:
shownotes = entry.itunes_summary
# 3. 最后回退到 summary 或 description
if not shownotes: # 回退到 summary 或 description
shownotes = entry.get('summary') or entry.get('description')
# 从 enclosures 获取音频 URL
audio_url = None
if 'enclosures' in entry:
for enc in entry.enclosures:
if enc.get('type', '').startswith('audio/'):
audio_url = enc.get('href')
break
# 解析特定于剧集的 iTunes 标签
itunes_season = None
try:
itunes_season_str = entry.get('itunes_season')
if itunes_season_str:
itunes_season = int(itunes_season_str)
except (ValueError, TypeError):
pass # 如果不是有效整数则忽略
itunes_episode_number = None
try:
itunes_episode_number_str = entry.get('itunes_episode')
if itunes_episode_number_str:
itunes_episode_number = int(itunes_episode_number_str)
except (ValueError, TypeError):
pass # 如果不是有效整数则忽略
episode = PodcastEpisode(
title=entry.get('title'),
link=entry.get('link'),
published_date=_parse_date(entry.get('published_parsed')),
summary=entry.get('summary'), # 这通常是较短的版本
shownotes=shownotes, # 这是我们尝试获取的更详细版本
audio_url=audio_url,
guid=entry.get('id') or entry.get('guid'),
duration=entry.get('itunes_duration'),
episode_type=entry.get('itunes_episodetype'),
season=itunes_season,
episode_number=itunes_episode_number
)
podcast_channel.episodes.append(episode)
return podcast_channel
def parse_podcast_rss(rss_url: str) -> Optional[PodcastChannel]:
"""
从给定的 RSS URL 获取并解析播客数据。
参数:
rss_url: 播客 RSS feed 的 URL。
返回:
一个包含已解析信息的 PodcastChannel 对象,如果获取或解析失败则返回 None。
"""
rss_content = fetch_rss_content(rss_url)
if rss_content:
return parse_rss_xml_content(rss_content)
return None |