File size: 9,920 Bytes
faa9726
 
 
e566133
 
faa9726
 
8fde8ae
f2772b4
 
 
 
faa9726
f2772b4
faa9726
 
f2772b4
faa9726
 
 
 
1268592
faa9726
3f29913
e566133
a88b8c8
e566133
a88b8c8
 
 
 
dc6febb
a88b8c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26741bb
a88b8c8
e566133
 
a88b8c8
 
 
 
 
 
 
 
 
 
e566133
 
 
 
a88b8c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e566133
a88b8c8
 
 
 
 
 
 
 
 
26741bb
a88b8c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e566133
a88b8c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e566133
a88b8c8
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import os
import time
import math
import asyncio
import logging
import traceback
from aiohttp import web
from typing import Dict, Union,Optional
from pyrogram.types import Message
from pyrogram import Client, utils, raw
from pyrogram.session import Session, Auth
from pyrogram.errors import AuthBytesInvalid
from aiohttp.http_exceptions import BadStatusLine
from pyrogram.file_id import FileId, FileType, ThumbnailSource
#---------------------Local Upload---------------------#
from FileStream.config import Telegram
from .file_properties import get_file_ids
from FileStream.bot import req_client, FileStream
from FileStream import utils, StartTime, __version__
from FileStream.Tools import mime_identifier, Time_ISTKolNow
from FileStream.utils.FileProcessors.custom_ul import TeleUploader
from FileStream.Exceptions import FileNotFound, InvalidHash
from FileStream.bot import MULTI_CLIENTS, WORK_LOADS, ACTIVE_CLIENTS



class ByteStreamer:
    def __init__(self, client: Client):
        self.clean_timer = 30 * 60  # Cache cleanup timer set to 30 minutes
        self.client: Client = client
        self.cached_file_ids: Dict[str, FileId] = {}  # Cache to store file properties by db_id
        self.last_activity: float = asyncio.get_event_loop().time()  # Track last activity time for the client
        asyncio.create_task(self.clean_cache())  # Start the cache cleanup task

    def update_last_activity(self):
        """Update the last activity time to the current time."""
        self.last_activity = asyncio.get_event_loop().time()

    def get_last_activity(self) -> float:
        """Get the last activity time of this client."""
        return self.last_activity

    async def get_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
        """
        Returns the properties of a media of a specific message in a FileId class.
        If the properties are cached, it'll return the cached results.
        Otherwise, it'll generate the properties from the Message ID and cache them.
        """
        if db_id not in self.cached_file_ids:
            logging.debug("File properties not cached. Generating properties.")
            await self.generate_file_properties(db_id, MULTI_CLIENTS)  # Generate and cache the file properties
            logging.debug(f"Cached file properties for file with ID {db_id}")
        return self.cached_file_ids[db_id]

    async def generate_file_properties(self, db_id: str, MULTI_CLIENTS) -> FileId:
        """
        Generates the properties of a media file on a specific message.
        Returns the properties in a FileId class.
        """
        logging.debug("Generating file properties.")
        file_id = await get_file_ids(self.client, db_id, Message)  # Call the method to get the file properties
        logging.debug(f"Generated file ID and Unique ID for file with ID {db_id}")
        self.cached_file_ids[db_id] = file_id  # Cache the file properties
        logging.debug(f"Cached media file with ID {db_id}")
        return file_id

    async def generate_media_session(self, client: Client, file_id: FileId) -> Session:
        """
        Generates the media session for the DC that contains the media file.
        This is required for getting the bytes from Telegram servers.
        """
        media_session = client.media_sessions.get(file_id.dc_id, None)

        if media_session is None:
            if file_id.dc_id != await client.storage.dc_id():
                # Create a new media session if one doesn't exist for this DC ID
                media_session = Session(
                    client,
                    file_id.dc_id,
                    await Auth(client, file_id.dc_id, await client.storage.test_mode()).create(),
                    await client.storage.test_mode(),
                    is_media=True,
                )
                await media_session.start()

                # Attempt to import authorization from Telegram's servers
                for _ in range(6):
                    exported_auth = await client.invoke(
                        raw.functions.auth.ExportAuthorization(dc_id=file_id.dc_id))

                    try:
                        # Import the authorization bytes for the DC
                        await media_session.invoke(
                            raw.functions.auth.ImportAuthorization(
                                id=exported_auth.id, bytes=exported_auth.bytes))
                        break
                    except AuthBytesInvalid:
                        logging.debug(f"Invalid authorization bytes for DC {file_id.dc_id}")
                        continue
                else:
                    await media_session.stop()
                    raise AuthBytesInvalid
            else:
                # Reuse the stored auth key if we're already connected to the correct DC
                media_session = Session(
                    client,
                    file_id.dc_id,
                    await client.storage.auth_key(),
                    await client.storage.test_mode(),
                    is_media=True,
                )
                await media_session.start()

            logging.debug(f"Created media session for DC {file_id.dc_id}")
            client.media_sessions[file_id.dc_id] = media_session  # Cache the media session
        else:
            logging.debug(f"Using cached media session for DC {file_id.dc_id}")
        return media_session

    @staticmethod
    async def get_location(file_id: FileId) -> Union[
            raw.types.InputPhotoFileLocation,
            raw.types.InputDocumentFileLocation,
            raw.types.InputPeerPhotoFileLocation,
    ]:
        """
        Returns the file location for the media file based on its type (Photo or Document).
        """
        file_type = file_id.file_type

        if file_type == FileType.CHAT_PHOTO:
            # Handle the case for chat photos
            if file_id.chat_id > 0:
                peer = raw.types.InputPeerUser(user_id=file_id.chat_id, access_hash=file_id.chat_access_hash)
            else:
                peer = raw.types.InputPeerChannel(
                    channel_id=utils.get_channel_id(file_id.chat_id),
                    access_hash=file_id.chat_access_hash,
                )

            location = raw.types.InputPeerPhotoFileLocation(
                peer=peer,
                volume_id=file_id.volume_id,
                local_id=file_id.local_id,
                big=file_id.thumbnail_source == ThumbnailSource.CHAT_PHOTO_BIG,
            )
        elif file_type == FileType.PHOTO:
            # Handle regular photos
            location = raw.types.InputPhotoFileLocation(
                id=file_id.media_id,
                access_hash=file_id.access_hash,
                file_reference=file_id.file_reference,
                thumb_size=file_id.thumbnail_size,
            )
        else:
            # Handle document files
            location = raw.types.InputDocumentFileLocation(
                id=file_id.media_id,
                access_hash=file_id.access_hash,
                file_reference=file_id.file_reference,
                thumb_size=file_id.thumbnail_size,
            )
        return location

    async def yield_file(
            self,
            file_id: FileId,
            index: int,
            offset: int,
            first_part_cut: int,
            last_part_cut: int,
            part_count: int,
            chunk_size: int,
    ) -> Union[str, None]:
        """
        Yields the file in chunks based on the specified range and chunk size.
        This method streams the file from Telegram's server, breaking it into smaller parts.
        """
        client = self.client
        WORK_LOADS[index] += 1  # Increase the workload for this client
        logging.debug(f"Starting to yield file with client {index}.")
        media_session = await self.generate_media_session(client, file_id)

        current_part = 1
        location = await self.get_location(file_id)

        try:
            # Fetch the file chunks
            r = await media_session.invoke(
                raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )

            if isinstance(r, raw.types.upload.File):
                # Stream the file in chunks
                while True:
                    chunk = r.bytes
                    if not chunk:
                        break
                    elif part_count == 1:
                        yield chunk[first_part_cut:last_part_cut]
                    elif current_part == 1:
                        yield chunk[first_part_cut:]
                    elif current_part == part_count:
                        yield chunk[:last_part_cut]
                    else:
                        yield chunk

                    current_part += 1
                    offset += chunk_size

                    if current_part > part_count:
                        break

                    r = await media_session.invoke(
                        raw.functions.upload.GetFile(location=location, offset=offset, limit=chunk_size), )
        except (TimeoutError, AttributeError):
            pass
        except Exception as e:
            logging.info(f"Error at Bytestreamer Generating Chunk : {e}")
        finally:
            logging.debug(f"Finished yielding file with {current_part} parts.")
            WORK_LOADS[index] -= 1  # Decrease the workload for this client

    async def clean_cache(self) -> None:
        """
        Function to clean the cache to reduce memory usage.
        This method will be called periodically to clear the cached file properties.
        """
        await asyncio.sleep(self.clean_timer)  # Wait for the cleanup interval
        logging.info("*** Cleaning cached file IDs...")
        self.cached_file_ids.clear()  # Clear the cache
        logging.debug("Cache cleaned.")