Spaces:
Running
Running
File size: 6,801 Bytes
8289369 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import requests
import feedparser
# from dataclasses import dataclass, field # 已移除
from typing import Optional # , List, Dict # List 和 Dict 不再需要
from datetime import datetime
import time
from ..schemas import PodcastEpisode, PodcastChannel
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
if not date_str:
return None
try:
# feedparser 已经将日期解析为 time.struct_time 类型
# 我们将其转换为 datetime 类型
if isinstance(date_str, time.struct_time):
return datetime.fromtimestamp(time.mktime(date_str))
# 如果 feedparser 解析失败或返回字符串,则回退使用其他字符串格式解析
# 这是一种常见的 RSS 日期格式
return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z')
except (ValueError, TypeError):
try:
return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %Z') # 处理 GMT, EST 等时区
except (ValueError, TypeError):
# 如果时区缺失或无法解析,则尝试不带时区解析
try:
return datetime.strptime(date_str[:-6], '%a, %d %b %Y %H:%M:%S')
except (ValueError, TypeError):
print(f"Warning: Could not parse date string: {date_str}")
return None
def fetch_rss_content(rss_url: str) -> Optional[bytes]:
"""
通过 HTTP 请求获取 RSS feed 的内容。
参数:
rss_url: 播客 RSS feed 的 URL。
返回:
bytes 类型的 RSS 内容,如果获取失败则返回 None。
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
}
try:
response = requests.get(rss_url, headers=headers, timeout=30)
response.raise_for_status() # 针对 HTTP 错误抛出异常
return response.content
except requests.exceptions.RequestException as e:
print(f"获取 RSS feed 时出错: {e}")
return None
def parse_rss_xml_content(rss_content: bytes) -> Optional[PodcastChannel]:
"""
解析播客 RSS XML 内容,并返回其主要信息和剧集详情。
参数:
rss_content: bytes 类型的 RSS XML 内容。
返回:
一个包含已解析信息的 PodcastChannel 对象,如果解析失败则返回 None。
"""
feed = feedparser.parse(rss_content)
if feed.bozo:
# 如果 feed 格式不正确,bozo 为 True
# feed.bozo_exception 包含异常信息
print(f"警告: RSS feed 可能格式不正确。Bozo 异常: {feed.bozo_exception}")
# 即使格式不完全正确,feedparser 通常仍会尝试解析,所以我们不在此处直接返回 None
# 但如果关键的 feed 或 channel_info 缺失,后续会自然失败
channel_info = feed.get('feed', {})
if not channel_info: # 如果连基本的 feed 结构都没有,则认为解析失败
print("错误: RSS 内容无法解析为有效的 feed 结构。")
return None
podcast_channel = PodcastChannel(
title=channel_info.get('title'),
link=channel_info.get('link'),
description=channel_info.get('subtitle') or channel_info.get('description'),
language=channel_info.get('language'),
image_url=channel_info.get('image', {}).get('href') if channel_info.get('image') else None,
author=channel_info.get('author') or channel_info.get('itunes_author'),
last_build_date=_parse_date(channel_info.get('updated_parsed') or channel_info.get('published_parsed'))
)
for entry in feed.entries:
# 确定 shownotes:优先使用 content:encoded,然后是 itunes:summary,其次是 description/summary
shownotes = None
# 1. 优先尝试 <content:encoded>
# entry.content 是一个 FeedParserDict 对象列表
if 'content' in entry and entry.content:
for content_item in entry.content:
# 检查 content_item 是否有 value 属性并且该值非空
if hasattr(content_item, 'value') and content_item.value:
shownotes = content_item.value
break # 找到第一个有效的 content:encoded,停止查找
# 2. 如果没有从 content:encoded 获得,尝试 itunes:summary
if not shownotes and 'itunes_summary' in entry:
shownotes = entry.itunes_summary
# 3. 最后回退到 summary 或 description
if not shownotes: # 回退到 summary 或 description
shownotes = entry.get('summary') or entry.get('description')
# 从 enclosures 获取音频 URL
audio_url = None
if 'enclosures' in entry:
for enc in entry.enclosures:
if enc.get('type', '').startswith('audio/'):
audio_url = enc.get('href')
break
# 解析特定于剧集的 iTunes 标签
itunes_season = None
try:
itunes_season_str = entry.get('itunes_season')
if itunes_season_str:
itunes_season = int(itunes_season_str)
except (ValueError, TypeError):
pass # 如果不是有效整数则忽略
itunes_episode_number = None
try:
itunes_episode_number_str = entry.get('itunes_episode')
if itunes_episode_number_str:
itunes_episode_number = int(itunes_episode_number_str)
except (ValueError, TypeError):
pass # 如果不是有效整数则忽略
episode = PodcastEpisode(
title=entry.get('title'),
link=entry.get('link'),
published_date=_parse_date(entry.get('published_parsed')),
summary=entry.get('summary'), # 这通常是较短的版本
shownotes=shownotes, # 这是我们尝试获取的更详细版本
audio_url=audio_url,
guid=entry.get('id') or entry.get('guid'),
duration=entry.get('itunes_duration'),
episode_type=entry.get('itunes_episodetype'),
season=itunes_season,
episode_number=itunes_episode_number
)
podcast_channel.episodes.append(episode)
return podcast_channel
def parse_podcast_rss(rss_url: str) -> Optional[PodcastChannel]:
"""
从给定的 RSS URL 获取并解析播客数据。
参数:
rss_url: 播客 RSS feed 的 URL。
返回:
一个包含已解析信息的 PodcastChannel 对象,如果获取或解析失败则返回 None。
"""
rss_content = fetch_rss_content(rss_url)
if rss_content:
return parse_rss_xml_content(rss_content)
return None |