import re import time import pymongo import motor.motor_asyncio from bson.objectid import ObjectId from bson.errors import InvalidId from bson.json_util import dumps #----------------------Local Imports-----------------------# from FileStream.server.exceptions import FIleNotFound from FileStream.Tools import Time_ISTKolNow class Database: def __init__(self, uri, database_name): self._client = motor.motor_asyncio.AsyncIOMotorClient(uri) self.db = self._client[database_name] self.users = self.db.Users self.files = self.db.Public_Files self.pfile = self.db.Private_Files self.web_upload = self.db.Web_Files #---------------------[ SCHEMAS ]------------------------------# #---------------------[ NEW USER ]---------------------# def new_user(self, id): return dict( telegram_id=id, access="USER", tele_status={ "status": "ACTIVE", "activity": None, "joined": Time_ISTKolNow() }, file={ "links": 0, "private_files": 0, "public_files": 0, }, site_id="None", site_status={ "status": None, "activity": None, "password": None, "links": 0, "joined": "None" }, ) #------------------------------------------------ def NewTG_Files(self, details): return { "user_id": details['user_id'] if details['user_id'] else None, "user_type": details['user_type'] if details['user_type'] else None, "message_id": details['message_id'] if details['message_id'] else None, "location": details['location'] if details['location'] else None, "file": { "file_id": details['file']['file_id'] if details['file']['file_id'] else None, "file_unique_id": details['file']['file_unique_id'] if details['file']['file_unique_id'] else None, "file_name": details['file']['file_name'] if details['file']['file_name'] else None, "file_size": details['file']['file_size'] if details['file']['file_size'] else None, "mime_type": details['file']['mime_type'] if details['file']['mime_type'] else None, "taged_users": {} }, "time": details['time'] if details['time'] else None, "privacy_type": details['privacy_type'] if details['privacy_type'] else None, } # ---------------------[ ADD USER ]---------------------# async def add_user(self, id): user = self.new_user(id) await self.users.insert_one(user) async def add_admin(self, id): user= await self.get_user(id) if user: await self.users.update_one({"_id": user['_id']}, {"$set": {"access":"ADMIN" }}) else: user = self.new_user(id) user['access']="ADMIN" await self.users.insert_one(user) # ---------------------[ GET USER ]---------------------# async def get_user(self, id): user = await self.users.find_one({'telegram_id': int(id)}) return user # ---------------------[ CHECK USER ]---------------------# async def total_users_count(self): count = await self.users.count_documents({}) return count async def get_all_users(self): all_users = self.users.find({}) return all_users async def is_admin(self, user_id): user = await self.users.find_one({'telegram_id': int(user_id)}) return True if user['access'] == "ADMIN" else False # ---------------------[ REMOVE USER ]---------------------# async def delete_user(self, user_id): await self.users.delete_many({'telegram_id': int(user_id)}) # ---------------------[ BAN, UNBAN USER ]---------------------# async def ban_user(self, id): await self.users.update_one({"_id": ObjectId(_id)}, { "$set": { "tele_status": { "status": "BANNED", "activity": Time_ISTKolNow() }, } }) async def unban_user(self, id): await self.users.update_one({"_id": ObjectId(_id)}, { "$set": { "tele_status": { "status": "ACTIVE", "activity": Time_ISTKolNow(), } } }) async def is_user_banned(self, id): if await self.users.find_one({'telegram_id': int(id)}): return True else: return False # ---------------------[ ADD FILE TO DB ]---------------------# async def add_file(self, file_info): file_info["time"] = Time_ISTKolNow() fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"]) if fetch_old: return fetch_old["_id"] await self.count_links(file_info["user_id"], "+") return (await self.files.insert_one(file_info)).inserted_id # ---------------------[ FIND FILE IN DB for Bot and APIs]---------------------# async def get_file(self, _id): try: file_info = await self.files.find_one({"_id": ObjectId(_id)}) if not file_info: print('file not found') #raise FIleNotFound return file_info except InvalidId: raise FIleNotFound async def get_all_files_api(self,range=None): #files = self.files.find({}) files= await self.files.find().to_list(length=None) #json_result = dumps(cursor)[row for row in files] print("\n get_all_files_api : Return Type : ", type(files)) return files async def get_all_files(self,range=None): user_files = self.files.find({}) if range : user_files.skip(range[0] - 1) user_files.limit(range[1] - range[0] + 1) user_files.sort('_id', pymongo.DESCENDING) return user_files async def find_files(self, user_id, range): user_files = self.files.find( {f"file.tagged_users.{user_id}": { "$exists": True }}) user_files.skip(range[0] - 1) user_files.limit(range[1] - range[0] + 1) user_files.sort('_id', pymongo.DESCENDING) total_files = await self.files.count_documents( {f"file.tagged_users.{user_id}": { "$exists": True }}) return user_files, total_files async def find_all_public_files(self, range): user_files = self.files.find({"privacy_type": "PUBLIC"}) user_files.skip(range[0] - 1) user_files.limit(range[1] - range[0] + 1) user_files.sort('_id', pymongo.DESCENDING) total_files = await self.files.count_documents({"privacy_type": "PUBLIC"}) return user_files, total_files async def find_all_files(self, range): user_files = self.files.find({}) user_files.skip(range[0] - 1) user_files.limit(range[1] - range[0] + 1) user_files.sort('_id', pymongo.DESCENDING) total_files = await self.files.count_documents({}) return user_files, total_files async def find_private_files(self, user_id, range): #search_string = "file.tagged_user." + str(user_id) user_files = self.files.find({f"file.tagged_users.{user_id}": "PRIVATE"}) user_files.skip(range[0] - 1) user_files.limit(range[1] - range[0] + 1) user_files.sort('_id', pymongo.DESCENDING) total_files = await self.files.count_documents( {"file.tagged_users." + str(user_id): "PRIVATE"}) return user_files, total_files async def get_file_by_fileuniqueid_only(self, file_unique_id): return await self.files.find_one({"file.file_unique_id": file_unique_id}) async def get_file_by_fileuniqueid(self, id, file_unique_id): count = await self.files.count_documents({"user_id":id,"file.file_unique_id":file_unique_id}) if count == 0: return False elif count == 1: return await self.files.find_one({"user_id": id,"file.file_unique_id": file_unique_id}) else: return self.files.find({"user_id": id,"file.file_unique_id": file_unique_id}) # ---------------------[ UPDATE FILE IN DB ]---------------------# async def update_privacy(self, file_details: dict): file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id']) # Merge the tagged_user dictionaries updated_tagged_users = file['file']['tagged_users'].copy() updated_tagged_users.update(file_details['file']['tagged_users']) #for value in updated_tagged_users.values(): # if value == "PRIVATE": # file_details['privacy_type']=="PRIVATE" file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type'] await self.files.update_one({"_id": file['_id']}, { "$set": { "privacy_type": file_details['privacy_type'], "file.tagged_users": updated_tagged_users } }) return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id']) async def update_file_ids(self, _id, file_ids: dict): await self.files.update_one({"_id": ObjectId(_id)}, {"$set": { "file_ids": file_ids }}) async def update_file_info(self, _id, file_info: dict): await self.files.update_one({"_id": ObjectId(_id)}, { "$set": { "message_id": file_info['message_id'], "location": file_info['location'], "file": file_info['file'] } }) #--------------------------PrivateFiles------------------- async def get_private_file(self, _id): try: file_info = await self.pfile.find_one({"_id": ObjectId(_id)}) if not file_info: raise FIleNotFound return file_info except InvalidId: raise FIleNotFound async def add_private_file(self, file_info): file_info["time"] = Time_ISTKolNow() fetch_old = await self.get_private_file_by_fileuniqueid_only(file_info['file']["file_unique_id"]) if fetch_old: return fetch_old["_id"] return (await self.pfile.insert_one(file_info)) async def get_private_file_by_fileuniqueid_only(self, file_unique_id): return await self.pfile.find_one({"file.file_unique_id": file_unique_id}) async def update_private_file_ids(self, _id, file_ids: dict): await self.pfile.update_one({"_id": ObjectId(_id)}, {"$set": { "file_ids": file_ids }}) async def update_private_privacy(self, file_details: dict, instruction: dict): file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id']) await self.pfile.insert_one(file_details) #####################-------search for inline query ------------############### async def get_search_results(self,query=None, file_type=None, max_results=10, offset=0): regex = re.compile(re.escape(query), re.IGNORECASE) filter = {'$or': [{'file.file_name': {"$regex": regex}}, {'file.caption': {"$regex": regex}}]} if file_type: filter['mime_type'] = file_type total_results = await self.files.count_documents(filter) next_offset = offset + max_results if next_offset > total_results: next_offset = '' cursor = self.files.find(filter) # Sort by recent cursor.sort('$natural', -1) # Slice files according to offset and max results cursor.skip(offset).limit(max_results) # Get list of files files = await cursor.to_list(length=max_results) return files, next_offset # ---------------------[ TOTAL FILES ]---------------------# async def total_files(self, id=None): if id: return await self.files.count_documents({"user_id": id}) return await self.files.count_documents({}) async def total_privfiles(self, id=None): if id: return await self.pfile.count_documents({"user_id": id}) return await self.pfile.count_documents({}) # ---------------------[ DELETE FILES ]---------------------# async def delete_one_file(self, _id): await self.files.delete_one({'_id': ObjectId(_id)}) # ---------------------[ PAID SYS ]---------------------# # async def link_available(self, id): # user = await self.col.find_one({"id": id}) # if user.get("Plan") == "Plus": # return "Plus" # elif user.get("Plan") == "Free": # files = await self.file.count_documents({"user_id": id}) # if files < 11: # return True # return False async def count_links(self, id, operation: str): if operation == "-": await self.users.update_one({"id": id}, {"$inc": {"file.links": -1}}) elif operation == "+": await self.users.update_one({"id": id}, {"$inc": {"file.links": 1}}) #------------------------------For Web Files -----------------------------------------# async def add_webfile(self, upload_info): fetch_old = await self.get_web_file(upload_info["dropzone_id"]) if fetch_old: return fetch_old else: await self.web_upload.insert_one(upload_info) return await self.get_web_file(upload_info["dropzone_id"]) #async def update_web_file(self, dropzone_id): async def get_web_file(self, upload_id): file_info = await self.web_upload.find_one({"dropzone_id": upload_id}) if not file_info: return None return file_info async def uploaded_web_file(self, upload_id): await self.web_upload.delete_one({"dropzone_id": upload_id})