File size: 6,801 Bytes
8289369
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import requests
import feedparser
# from dataclasses import dataclass, field # 已移除
from typing import Optional # , List, Dict # List 和 Dict 不再需要
from datetime import datetime
import time

from ..schemas import PodcastEpisode, PodcastChannel

def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
    if not date_str:
        return None
    try:
        # feedparser 已经将日期解析为 time.struct_time 类型
        # 我们将其转换为 datetime 类型
        if isinstance(date_str, time.struct_time):
            return datetime.fromtimestamp(time.mktime(date_str))
        # 如果 feedparser 解析失败或返回字符串,则回退使用其他字符串格式解析
        # 这是一种常见的 RSS 日期格式
        return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z')
    except (ValueError, TypeError):
        try:
            return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %Z') # 处理 GMT, EST 等时区
        except (ValueError, TypeError):
            # 如果时区缺失或无法解析,则尝试不带时区解析
            try:
                return datetime.strptime(date_str[:-6], '%a, %d %b %Y %H:%M:%S')
            except (ValueError, TypeError):
                print(f"Warning: Could not parse date string: {date_str}")
                return None

def fetch_rss_content(rss_url: str) -> Optional[bytes]:
    """
    通过 HTTP 请求获取 RSS feed 的内容。

    参数:
        rss_url: 播客 RSS feed 的 URL。

    返回:
        bytes 类型的 RSS 内容,如果获取失败则返回 None。
    """
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
    }
    try:
        response = requests.get(rss_url, headers=headers, timeout=30)
        response.raise_for_status() # 针对 HTTP 错误抛出异常
        return response.content
    except requests.exceptions.RequestException as e:
        print(f"获取 RSS feed 时出错: {e}")
        return None

def parse_rss_xml_content(rss_content: bytes) -> Optional[PodcastChannel]:
    """
    解析播客 RSS XML 内容,并返回其主要信息和剧集详情。

    参数:
        rss_content: bytes 类型的 RSS XML 内容。

    返回:
        一个包含已解析信息的 PodcastChannel 对象,如果解析失败则返回 None。
    """
    feed = feedparser.parse(rss_content)

    if feed.bozo:
        # 如果 feed 格式不正确,bozo 为 True
        # feed.bozo_exception 包含异常信息
        print(f"警告: RSS feed 可能格式不正确。Bozo 异常: {feed.bozo_exception}")
        # 即使格式不完全正确,feedparser 通常仍会尝试解析,所以我们不在此处直接返回 None
        # 但如果关键的 feed 或 channel_info 缺失,后续会自然失败

    channel_info = feed.get('feed', {})
    if not channel_info: # 如果连基本的 feed 结构都没有,则认为解析失败
        print("错误: RSS 内容无法解析为有效的 feed 结构。")
        return None

    podcast_channel = PodcastChannel(
        title=channel_info.get('title'),
        link=channel_info.get('link'),
        description=channel_info.get('subtitle') or channel_info.get('description'),
        language=channel_info.get('language'),
        image_url=channel_info.get('image', {}).get('href') if channel_info.get('image') else None,
        author=channel_info.get('author') or channel_info.get('itunes_author'),
        last_build_date=_parse_date(channel_info.get('updated_parsed') or channel_info.get('published_parsed'))
    )

    for entry in feed.entries:
        # 确定 shownotes:优先使用 content:encoded,然后是 itunes:summary,其次是 description/summary
        shownotes = None
        # 1. 优先尝试 <content:encoded>
        # entry.content 是一个 FeedParserDict 对象列表
        if 'content' in entry and entry.content:
            for content_item in entry.content:
                # 检查 content_item 是否有 value 属性并且该值非空
                if hasattr(content_item, 'value') and content_item.value:
                    shownotes = content_item.value
                    break # 找到第一个有效的 content:encoded,停止查找
        
        # 2. 如果没有从 content:encoded 获得,尝试 itunes:summary
        if not shownotes and 'itunes_summary' in entry:
            shownotes = entry.itunes_summary
        
        # 3. 最后回退到 summary 或 description
        if not shownotes: # 回退到 summary 或 description
            shownotes = entry.get('summary') or entry.get('description')

        # 从 enclosures 获取音频 URL
        audio_url = None
        if 'enclosures' in entry:
            for enc in entry.enclosures:
                if enc.get('type', '').startswith('audio/'):
                    audio_url = enc.get('href')
                    break
        
        # 解析特定于剧集的 iTunes 标签
        itunes_season = None
        try:
            itunes_season_str = entry.get('itunes_season')
            if itunes_season_str:
                itunes_season = int(itunes_season_str)
        except (ValueError, TypeError):
            pass # 如果不是有效整数则忽略

        itunes_episode_number = None
        try:
            itunes_episode_number_str = entry.get('itunes_episode')
            if itunes_episode_number_str:
                itunes_episode_number = int(itunes_episode_number_str)
        except (ValueError, TypeError):
            pass # 如果不是有效整数则忽略

        episode = PodcastEpisode(
            title=entry.get('title'),
            link=entry.get('link'),
            published_date=_parse_date(entry.get('published_parsed')),
            summary=entry.get('summary'), # 这通常是较短的版本
            shownotes=shownotes, # 这是我们尝试获取的更详细版本
            audio_url=audio_url,
            guid=entry.get('id') or entry.get('guid'),
            duration=entry.get('itunes_duration'),
            episode_type=entry.get('itunes_episodetype'),
            season=itunes_season,
            episode_number=itunes_episode_number
        )
        podcast_channel.episodes.append(episode)

    return podcast_channel

def parse_podcast_rss(rss_url: str) -> Optional[PodcastChannel]:
    """
    从给定的 RSS URL 获取并解析播客数据。

    参数:
        rss_url: 播客 RSS feed 的 URL。

    返回:
        一个包含已解析信息的 PodcastChannel 对象,如果获取或解析失败则返回 None。
    """
    rss_content = fetch_rss_content(rss_url)
    if rss_content:
        return parse_rss_xml_content(rss_content)
    return None