Spaces:
Running
Running
import re | |
import time | |
import pymongo | |
import motor.motor_asyncio | |
from bson.objectid import ObjectId | |
from bson.errors import InvalidId | |
from bson.json_util import dumps | |
#----------------------Local Imports-----------------------# | |
from FileStream.Exceptions import FileNotFound | |
from FileStream.Tools import Time_ISTKolNow | |
from .Elements import UserSchema, NewTG_Files | |
class Database: | |
def __init__(self, uri, database_name): | |
self._client = motor.motor_asyncio.AsyncIOMotorClient(uri) | |
self.db = self._client[database_name] | |
self.users = self.db.Users | |
self.tfiles = self.db.Temp_Files | |
self.files = self.db.Public_Files | |
self.pfile = self.db.Private_Files | |
self.web_upload = self.db.Web_Files | |
#---------------------[ SCHEMAS ]------------------------------# | |
#---------------------[ NEW USER ]---------------------# | |
# ---------------------[ ADD USER ]---------------------# | |
async def add_user(self, id, details=None): | |
user = UserSchema(id, details) | |
await self.users.insert_one(user) | |
async def add_admin(self, id, details=None): | |
user= await self.get_user(id) | |
if user: | |
await self.users.update_one({"_id": user['_id']}, {"$set": {"access":"ADMIN" }}) | |
else: | |
user = UserSchema(id,details) | |
user['access']="ADMIN" | |
await self.users.insert_one(user) | |
# ---------------------[ GET USER ]---------------------# | |
async def get_user(self, id): | |
user = await self.users.find_one({'telegram_id': int(id)}) | |
return user | |
# ---------------------[ CHECK USER ]---------------------# | |
async def total_users_count(self): | |
count = await self.users.count_documents({}) | |
return count | |
async def get_all_users(self): | |
all_users = self.users.find({}) | |
return all_users | |
async def is_admin(self, user_id): | |
user = await self.users.find_one({'telegram_id': int(user_id)}) | |
return True if user['access'] == "ADMIN" else False | |
# ---------------------[ REMOVE USER ]---------------------# | |
async def delete_user(self, user_id): | |
await self.users.delete_many({'telegram_id': int(user_id)}) | |
# ---------------------[ BAN, UNBAN USER ]---------------------# | |
async def ban_user(self, id): | |
await self.users.update_one({"_id": ObjectId(_id)}, { | |
"$set": { | |
"tele_status": { | |
"status": "BANNED", | |
"activity": Time_ISTKolNow() | |
}, | |
} | |
}) | |
async def unban_user(self, id): | |
await self.users.update_one({"_id": ObjectId(_id)}, { | |
"$set": { | |
"tele_status": { | |
"status": "ACTIVE", | |
"activity": Time_ISTKolNow(), | |
} | |
} | |
}) | |
async def is_user_banned(self, id): | |
if await self.users.find_one({'telegram_id': int(id)}): | |
return True | |
else: | |
return False | |
# ---------------------[ ADD FILE TO DB ]---------------------# | |
async def add_file(self, file_info:dict, db_type:str): | |
file_info["time"] = Time_ISTKolNow() | |
if db_type =="PUBLIC": | |
fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"], privacy_type="PUBLIC") | |
if fetch_old: | |
return fetch_old["_id"] | |
await self.count_links(file_info["user_id"], "+") | |
return (await self.files.insert_one(file_info)).inserted_id | |
elif db_type=="PRIVATE": | |
pass | |
elif db_type == "TEMPORARY": | |
fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"], privacy_type="TEMPORARY") | |
if fetch_old: | |
return fetch_old["_id"] | |
await self.count_links(file_info["user_id"], "+") | |
return (await self.tfiles.insert_one(file_info)).inserted_id | |
else: | |
return {} | |
# ---------------------[ ADD FILE TO Temp DB ]---------------------# | |
async def add_temp_file(self, file_info): | |
file_info["time"] = Time_ISTKolNow() | |
return (await self.tfiles.insert_one(file_info)).inserted_id | |
# ---------------------[ FIND FILE IN DB for Bot and APIs]---------------------# | |
async def get_file(self, _id): | |
try: | |
file_info = await self.files.find_one({"_id": ObjectId(_id)}) | |
if not file_info: | |
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)}) | |
#raise FileNotFound | |
#print(file_info) | |
return file_info | |
except InvalidId: | |
raise FileNotFound | |
async def GetFileByDBName(self, _id, privacy_type:str ): | |
if privacy_type == "PUBLIC": | |
try: | |
file_info = await self.files.find_one({"_id": ObjectId(_id)}) | |
if not file_info: | |
print('file not found') | |
#raise FileNotFound | |
return file_info | |
except InvalidId: | |
raise FileNotFound | |
elif privacy_type == "TEMPORARY": | |
try: | |
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)}) | |
if not file_info: | |
print('file not found') | |
#raise FileNotFound | |
return file_info | |
except InvalidId: | |
raise FileNotFound | |
elif privacy_type == "PRIVATE": | |
try: | |
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)}) | |
if not file_info: | |
print('file not found') | |
#raise FileNotFound | |
return file_info | |
except InvalidId: | |
raise FileNotFound | |
else: | |
return None | |
async def get_all_files_api(self,range=None): | |
#files = self.files.find({}) | |
files= await self.files.find().to_list(length=None) | |
#json_result = dumps(cursor)[row for row in files] | |
print("\n get_all_files_api : Return Type : ", type(files)) | |
return files | |
async def get_all_files(self,range=None): | |
user_files = self.files.find({}) | |
if range : | |
user_files.skip(range[0] - 1) | |
user_files.limit(range[1] - range[0] + 1) | |
user_files.sort('_id', pymongo.DESCENDING) | |
return user_files | |
async def find_files(self, user_id, range): | |
user_files = self.files.find( | |
{f"file.tagged_users.{user_id}": { | |
"$exists": True | |
}}) | |
user_files.skip(range[0] - 1) | |
user_files.limit(range[1] - range[0] + 1) | |
user_files.sort('_id', pymongo.DESCENDING) | |
total_files = await self.files.count_documents( | |
{f"file.tagged_users.{user_id}": { | |
"$exists": True | |
}}) | |
return user_files, total_files | |
async def find_all_public_files(self, range): | |
user_files = self.files.find({"privacy_type": "PUBLIC"}) | |
user_files.skip(range[0] - 1) | |
user_files.limit(range[1] - range[0] + 1) | |
user_files.sort('_id', pymongo.DESCENDING) | |
total_files = await self.files.count_documents({"privacy_type": "PUBLIC"}) | |
return user_files, total_files | |
async def find_all_files(self, range): | |
user_files = self.files.find({}) | |
user_files.skip(range[0] - 1) | |
user_files.limit(range[1] - range[0] + 1) | |
user_files.sort('_id', pymongo.DESCENDING) | |
total_files = await self.files.count_documents({}) | |
return user_files, total_files | |
async def find_private_files(self, user_id, range): | |
#search_string = "file.tagged_user." + str(user_id) | |
user_files = self.files.find({f"file.tagged_users.{user_id}": "PRIVATE"}) | |
user_files.skip(range[0] - 1) | |
user_files.limit(range[1] - range[0] + 1) | |
user_files.sort('_id', pymongo.DESCENDING) | |
total_files = await self.files.count_documents( | |
{"file.tagged_users." + str(user_id): "PRIVATE"}) | |
return user_files, total_files | |
async def get_file_by_fileuniqueid_only(self, file_unique_id, privacy_type:str): | |
if privacy_type=="TEMPORARY": | |
return await self.tfiles.find_one({"file.file_unique_id": file_unique_id}) | |
elif privacy_type=="PUBLIC": | |
return await self.files.find_one({"file.file_unique_id": file_unique_id}) | |
elif privacy_type=="PRIVATE": | |
return await self.pfiles.find_one({"file.file_unique_id": file_unique_id}) | |
else: | |
return {} | |
async def get_file_by_fileuniqueid(self, id, file_unique_id, privacy_type:str): | |
if privacy_type=="TEMPORARY": | |
count = await self.tfiles.count_documents({"user_id":id,"file.file_unique_id":file_unique_id}) | |
if count == 0: | |
return False | |
elif count == 1: | |
return await self.tfiles.find_one({"user_id": id,"file.file_unique_id": file_unique_id}) | |
else: | |
return self.tfiles.find({"user_id": id,"file.file_unique_id": file_unique_id}) | |
elif privacy_type=="PUBLIC": | |
count = await self.files.count_documents({"user_id":id,"file.file_unique_id":file_unique_id}) | |
if count == 0: | |
return False | |
elif count == 1: | |
return await self.files.find_one({"user_id": id,"file.file_unique_id": file_unique_id}) | |
else: | |
return self.files.find({"user_id": id,"file.file_unique_id": file_unique_id}) | |
elif privacy_type=="PRIVATE": | |
return await self.pfiles.find_one({"file.file_unique_id": file_unique_id}) | |
else: | |
return {} | |
# ---------------------[ UPDATE FILE IN DB ]---------------------# | |
async def update_privacy(self, file_details: dict): | |
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) | |
# Merge the tagged_user dictionaries | |
updated_tagged_users = file['file']['tagged_users'].copy() | |
updated_tagged_users.update(file_details['file']['tagged_users']) | |
#for value in updated_tagged_users.values(): | |
# if value == "PRIVATE": | |
# file_details['privacy_type']=="PRIVATE" | |
if file_details['privacy_type']=="TEMPORARY": | |
#file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type'] | |
await self.tfiles.update_one({"_id": file['_id']}, { | |
"$set": { | |
"privacy_type": file_details['privacy_type'], | |
"file.tagged_users": updated_tagged_users | |
} | |
}) | |
return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) | |
else: | |
file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type'] | |
await self.files.update_one({"_id": file['_id']}, { | |
"$set": { | |
"privacy_type": file_details['privacy_type'], | |
"file.tagged_users": updated_tagged_users | |
} | |
}) | |
return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) | |
async def update_file_ids(self, _id, file_ids: dict, privacy_type:str): | |
if privacy_type=="PUBLIC": | |
await self.files.update_one({"_id": ObjectId(_id)}, | |
{"$set": { | |
"file_ids": file_ids | |
}}) | |
elif privacy_type=="TEMPORARY": | |
await self.tfiles.update_one({"_id": ObjectId(_id)}, | |
{"$set": { | |
"file_ids": file_ids | |
}}) | |
elif privacy_type=="PRIVATE": | |
await self.tfiles.update_one({"_id": ObjectId(_id)}, | |
{"$set": { | |
"file_ids": file_ids | |
}}) | |
else: | |
return None | |
async def update_file_info(self, _id, file_info: dict, privacy_type:str): | |
if privacy_type=="PUBLIC": | |
await self.files.update_one({"_id": ObjectId(_id)}, { | |
"$set": { | |
"message_id": file_info['message_id'], | |
"location": file_info['location'], | |
"file": file_info['file'] | |
} | |
}) | |
elif privacy_type=="TEMPORARY": | |
await self.tfiles.update_one({"_id": ObjectId(_id)}, { | |
"$set": { | |
"message_id": file_info['message_id'], | |
"location": file_info['location'], | |
"file": file_info['file'] | |
} | |
}) | |
else: | |
await self.tfiles.update_one({"_id": ObjectId(_id)}, { | |
"$set": { | |
"message_id": file_info['message_id'], | |
"location": file_info['location'], | |
"file": file_info['file'] | |
} | |
}) | |
#--------------------------PrivateFiles------------------- | |
async def get_private_file(self, _id): | |
try: | |
file_info = await self.pfile.find_one({"_id": ObjectId(_id)}) | |
if not file_info: | |
raise FileNotFound | |
return file_info | |
except InvalidId: | |
raise FileNotFound | |
async def add_private_file(self, file_info): | |
file_info["time"] = Time_ISTKolNow() | |
fetch_old = await self.get_private_file_by_fileuniqueid_only(file_info['file']["file_unique_id"]) | |
if fetch_old: | |
return fetch_old["_id"] | |
return (await self.pfile.insert_one(file_info)) | |
async def get_private_file_by_fileuniqueid_only(self, file_unique_id): | |
return await self.pfile.find_one({"file.file_unique_id": file_unique_id}) | |
async def update_private_file_ids(self, _id, file_ids: dict): | |
await self.pfile.update_one({"_id": ObjectId(_id)}, | |
{"$set": { | |
"file_ids": file_ids | |
}}) | |
async def update_private_privacy(self, file_details: dict, instruction: dict): | |
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) | |
await self.pfile.insert_one(file_details) | |
#####################-------search for inline query ------------############### | |
async def get_search_results(self,query=None, file_type=None, max_results=10, offset=0): | |
regex = re.compile(re.escape(query), re.IGNORECASE) | |
filter = {'$or': [{'file.file_name': {"$regex": regex}}, {'file.caption': {"$regex": regex}}, {'title': {"$regex": regex}}, {'release_date': {"$regex": regex}}]} | |
if file_type: | |
filter['mime_type'] = file_type | |
total_results = await self.files.count_documents(filter) | |
next_offset = offset + max_results | |
if next_offset > total_results: | |
next_offset = '' | |
cursor = self.files.find(filter) | |
# Sort by recent | |
cursor.sort('$natural', -1) | |
# Slice files according to offset and max results | |
cursor.skip(offset).limit(max_results) | |
# Get list of files | |
files = await cursor.to_list(length=max_results) | |
return files, next_offset | |
# ---------------------[ TOTAL FILES ]---------------------# | |
async def total_files(self, id=None): | |
if id: | |
return await self.files.count_documents({"user_id": id}) | |
return await self.files.count_documents({}) | |
async def total_privfiles(self, id=None): | |
if id: | |
return await self.pfile.count_documents({"user_id": id}) | |
return await self.pfile.count_documents({}) | |
# ---------------------[ DELETE FILES ]---------------------# | |
async def delete_one_file(self, _id): | |
await self.files.delete_one({'_id': ObjectId(_id)}) | |
# ---------------------[ PAID SYS ]---------------------# | |
# async def link_available(self, id): | |
# user = await self.col.find_one({"id": id}) | |
# if user.get("Plan") == "Plus": | |
# return "Plus" | |
# elif user.get("Plan") == "Free": | |
# files = await self.file.count_documents({"user_id": id}) | |
# if files < 11: | |
# return True | |
# return False | |
async def count_links(self, id, operation: str): | |
if operation == "-": | |
await self.users.update_one({"id": id}, {"$inc": {"file.links": -1}}) | |
elif operation == "+": | |
await self.users.update_one({"id": id}, {"$inc": {"file.links": 1}}) | |
#------------------------------For Web Files -----------------------------------------# | |
async def add_webfile(self, upload_info): | |
fetch_old = await self.get_web_file(upload_info["dropzone_id"]) | |
if fetch_old: | |
return fetch_old | |
else: | |
await self.web_upload.insert_one(upload_info) | |
return await self.get_web_file(upload_info["dropzone_id"]) | |
#async def update_web_file(self, dropzone_id): | |
async def get_web_file(self, upload_id): | |
file_info = await self.web_upload.find_one({"dropzone_id": upload_id}) | |
if not file_info: | |
return None | |
return file_info | |
async def uploaded_web_file(self, upload_id): | |
await self.web_upload.delete_one({"dropzone_id": upload_id}) | |