|
import re |
|
import time |
|
import pymongo |
|
import motor.motor_asyncio |
|
from bson.objectid import ObjectId |
|
from bson.errors import InvalidId |
|
from bson.json_util import dumps |
|
|
|
|
|
from FileStream.Exceptions import FileNotFound |
|
from FileStream.Tools import Time_ISTKolNow |
|
|
|
from .Elements import UserSchema, NewTG_Files |
|
|
|
|
|
class Database: |
|
|
|
def __init__(self, uri, database_name): |
|
self._client = motor.motor_asyncio.AsyncIOMotorClient(uri) |
|
self.db = self._client[database_name] |
|
self.users = self.db.Users |
|
self.tfiles = self.db.Temp_Files |
|
self.files = self.db.Public_Files |
|
self.pfile = self.db.Private_Files |
|
self.web_upload = self.db.Web_Files |
|
|
|
|
|
|
|
|
|
|
|
async def add_user(self, id, details=None): |
|
user = UserSchema(id, details) |
|
await self.users.insert_one(user) |
|
|
|
async def add_admin(self, id, details=None): |
|
user= await self.get_user(id) |
|
if user: |
|
await self.users.update_one({"_id": user['_id']}, {"$set": {"access":"ADMIN" }}) |
|
else: |
|
user = UserSchema(id,details) |
|
user['access']="ADMIN" |
|
await self.users.insert_one(user) |
|
|
|
|
|
|
|
async def get_user(self, id): |
|
user = await self.users.find_one({'telegram_id': int(id)}) |
|
return user |
|
|
|
|
|
|
|
async def total_users_count(self): |
|
count = await self.users.count_documents({}) |
|
return count |
|
|
|
async def get_all_users(self): |
|
all_users = self.users.find({}) |
|
return all_users |
|
|
|
async def is_admin(self, user_id): |
|
user = await self.users.find_one({'telegram_id': int(user_id)}) |
|
return True if user['access'] == "ADMIN" else False |
|
|
|
|
|
|
|
async def delete_user(self, user_id): |
|
await self.users.delete_many({'telegram_id': int(user_id)}) |
|
|
|
|
|
|
|
async def ban_user(self, id): |
|
await self.users.update_one({"_id": ObjectId(_id)}, { |
|
"$set": { |
|
"tele_status": { |
|
"status": "BANNED", |
|
"activity": Time_ISTKolNow() |
|
}, |
|
} |
|
}) |
|
|
|
async def unban_user(self, id): |
|
await self.users.update_one({"_id": ObjectId(_id)}, { |
|
"$set": { |
|
"tele_status": { |
|
"status": "ACTIVE", |
|
"activity": Time_ISTKolNow(), |
|
} |
|
} |
|
}) |
|
|
|
async def is_user_banned(self, id): |
|
if await self.users.find_one({'telegram_id': int(id)}): |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
async def add_file(self, file_info:dict, db_type:str): |
|
file_info["time"] = Time_ISTKolNow() |
|
if db_type =="PUBLIC": |
|
fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"], privacy_type="PUBLIC") |
|
if fetch_old: |
|
return fetch_old["_id"] |
|
await self.count_links(file_info["user_id"], "+") |
|
return (await self.files.insert_one(file_info)).inserted_id |
|
elif db_type=="PRIVATE": |
|
pass |
|
elif db_type == "TEMPORARY": |
|
fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"], privacy_type="TEMPORARY") |
|
if fetch_old: |
|
return fetch_old["_id"] |
|
await self.count_links(file_info["user_id"], "+") |
|
return (await self.tfiles.insert_one(file_info)).inserted_id |
|
else: |
|
return {} |
|
|
|
|
|
|
|
async def add_temp_file(self, file_info): |
|
file_info["time"] = Time_ISTKolNow() |
|
return (await self.tfiles.insert_one(file_info)).inserted_id |
|
|
|
|
|
|
|
async def get_file(self, _id): |
|
try: |
|
file_info = await self.files.find_one({"_id": ObjectId(_id)}) |
|
if not file_info: |
|
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)}) |
|
|
|
|
|
return file_info |
|
except InvalidId: |
|
raise FileNotFound |
|
|
|
async def GetFileByDBName(self, _id, privacy_type:str ): |
|
if privacy_type == "PUBLIC": |
|
try: |
|
file_info = await self.files.find_one({"_id": ObjectId(_id)}) |
|
if not file_info: |
|
print('file not found') |
|
|
|
return file_info |
|
except InvalidId: |
|
raise FileNotFound |
|
elif privacy_type == "TEMPORARY": |
|
try: |
|
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)}) |
|
if not file_info: |
|
print('file not found') |
|
|
|
return file_info |
|
except InvalidId: |
|
raise FileNotFound |
|
elif privacy_type == "PRIVATE": |
|
try: |
|
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)}) |
|
if not file_info: |
|
print('file not found') |
|
|
|
return file_info |
|
except InvalidId: |
|
raise FileNotFound |
|
else: |
|
return None |
|
|
|
async def get_all_files_api(self,range=None): |
|
|
|
files= await self.files.find().to_list(length=None) |
|
|
|
print("\n get_all_files_api : Return Type : ", type(files)) |
|
return files |
|
|
|
async def get_all_files(self,range=None): |
|
user_files = self.files.find({}) |
|
if range : |
|
user_files.skip(range[0] - 1) |
|
user_files.limit(range[1] - range[0] + 1) |
|
user_files.sort('_id', pymongo.DESCENDING) |
|
return user_files |
|
|
|
async def find_files(self, user_id, range): |
|
user_files = self.files.find( |
|
{f"file.tagged_users.{user_id}": { |
|
"$exists": True |
|
}}) |
|
user_files.skip(range[0] - 1) |
|
user_files.limit(range[1] - range[0] + 1) |
|
user_files.sort('_id', pymongo.DESCENDING) |
|
total_files = await self.files.count_documents( |
|
{f"file.tagged_users.{user_id}": { |
|
"$exists": True |
|
}}) |
|
return user_files, total_files |
|
|
|
async def find_all_public_files(self, range): |
|
user_files = self.files.find({"privacy_type": "PUBLIC"}) |
|
user_files.skip(range[0] - 1) |
|
user_files.limit(range[1] - range[0] + 1) |
|
user_files.sort('_id', pymongo.DESCENDING) |
|
total_files = await self.files.count_documents({"privacy_type": "PUBLIC"}) |
|
return user_files, total_files |
|
|
|
async def find_all_files(self, range): |
|
user_files = self.files.find({}) |
|
user_files.skip(range[0] - 1) |
|
user_files.limit(range[1] - range[0] + 1) |
|
user_files.sort('_id', pymongo.DESCENDING) |
|
total_files = await self.files.count_documents({}) |
|
return user_files, total_files |
|
|
|
async def find_private_files(self, user_id, range): |
|
|
|
user_files = self.files.find({f"file.tagged_users.{user_id}": "PRIVATE"}) |
|
user_files.skip(range[0] - 1) |
|
user_files.limit(range[1] - range[0] + 1) |
|
user_files.sort('_id', pymongo.DESCENDING) |
|
total_files = await self.files.count_documents( |
|
{"file.tagged_users." + str(user_id): "PRIVATE"}) |
|
return user_files, total_files |
|
|
|
async def get_file_by_fileuniqueid_only(self, file_unique_id, privacy_type:str): |
|
if privacy_type=="TEMPORARY": |
|
return await self.tfiles.find_one({"file.file_unique_id": file_unique_id}) |
|
elif privacy_type=="PUBLIC": |
|
return await self.files.find_one({"file.file_unique_id": file_unique_id}) |
|
elif privacy_type=="PRIVATE": |
|
return await self.pfiles.find_one({"file.file_unique_id": file_unique_id}) |
|
else: |
|
return {} |
|
|
|
async def get_file_by_fileuniqueid(self, id, file_unique_id, privacy_type:str): |
|
if privacy_type=="TEMPORARY": |
|
count = await self.tfiles.count_documents({"user_id":id,"file.file_unique_id":file_unique_id}) |
|
if count == 0: |
|
return False |
|
elif count == 1: |
|
return await self.tfiles.find_one({"user_id": id,"file.file_unique_id": file_unique_id}) |
|
else: |
|
return self.tfiles.find({"user_id": id,"file.file_unique_id": file_unique_id}) |
|
|
|
elif privacy_type=="PUBLIC": |
|
count = await self.files.count_documents({"user_id":id,"file.file_unique_id":file_unique_id}) |
|
if count == 0: |
|
return False |
|
elif count == 1: |
|
return await self.files.find_one({"user_id": id,"file.file_unique_id": file_unique_id}) |
|
else: |
|
return self.files.find({"user_id": id,"file.file_unique_id": file_unique_id}) |
|
|
|
elif privacy_type=="PRIVATE": |
|
return await self.pfiles.find_one({"file.file_unique_id": file_unique_id}) |
|
else: |
|
return {} |
|
|
|
|
|
|
|
|
|
async def update_privacy(self, file_details: dict): |
|
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) |
|
|
|
updated_tagged_users = file['file']['tagged_users'].copy() |
|
updated_tagged_users.update(file_details['file']['tagged_users']) |
|
|
|
|
|
|
|
if file_details['privacy_type']=="TEMPORARY": |
|
|
|
await self.tfiles.update_one({"_id": file['_id']}, { |
|
"$set": { |
|
"privacy_type": file_details['privacy_type'], |
|
"file.tagged_users": updated_tagged_users |
|
} |
|
}) |
|
return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) |
|
else: |
|
file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type'] |
|
|
|
await self.files.update_one({"_id": file['_id']}, { |
|
"$set": { |
|
"privacy_type": file_details['privacy_type'], |
|
"file.tagged_users": updated_tagged_users |
|
} |
|
}) |
|
return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) |
|
|
|
async def update_file_ids(self, _id, file_ids: dict, privacy_type:str): |
|
if privacy_type=="PUBLIC": |
|
await self.files.update_one({"_id": ObjectId(_id)}, |
|
{"$set": { |
|
"file_ids": file_ids |
|
}}) |
|
|
|
elif privacy_type=="TEMPORARY": |
|
await self.tfiles.update_one({"_id": ObjectId(_id)}, |
|
{"$set": { |
|
"file_ids": file_ids |
|
}}) |
|
elif privacy_type=="PRIVATE": |
|
await self.tfiles.update_one({"_id": ObjectId(_id)}, |
|
{"$set": { |
|
"file_ids": file_ids |
|
}}) |
|
else: |
|
return None |
|
|
|
async def update_file_info(self, _id, file_info: dict, privacy_type:str): |
|
if privacy_type=="PUBLIC": |
|
await self.files.update_one({"_id": ObjectId(_id)}, { |
|
"$set": { |
|
"message_id": file_info['message_id'], |
|
"location": file_info['location'], |
|
"file": file_info['file'] |
|
} |
|
}) |
|
elif privacy_type=="TEMPORARY": |
|
await self.tfiles.update_one({"_id": ObjectId(_id)}, { |
|
"$set": { |
|
"message_id": file_info['message_id'], |
|
"location": file_info['location'], |
|
"file": file_info['file'] |
|
} |
|
}) |
|
else: |
|
await self.tfiles.update_one({"_id": ObjectId(_id)}, { |
|
"$set": { |
|
"message_id": file_info['message_id'], |
|
"location": file_info['location'], |
|
"file": file_info['file'] |
|
} |
|
}) |
|
|
|
|
|
|
|
|
|
async def get_private_file(self, _id): |
|
try: |
|
file_info = await self.pfile.find_one({"_id": ObjectId(_id)}) |
|
if not file_info: |
|
raise FileNotFound |
|
return file_info |
|
except InvalidId: |
|
raise FileNotFound |
|
|
|
async def add_private_file(self, file_info): |
|
file_info["time"] = Time_ISTKolNow() |
|
fetch_old = await self.get_private_file_by_fileuniqueid_only(file_info['file']["file_unique_id"]) |
|
if fetch_old: |
|
return fetch_old["_id"] |
|
return (await self.pfile.insert_one(file_info)) |
|
|
|
async def get_private_file_by_fileuniqueid_only(self, file_unique_id): |
|
return await self.pfile.find_one({"file.file_unique_id": file_unique_id}) |
|
|
|
async def update_private_file_ids(self, _id, file_ids: dict): |
|
await self.pfile.update_one({"_id": ObjectId(_id)}, |
|
{"$set": { |
|
"file_ids": file_ids |
|
}}) |
|
|
|
async def update_private_privacy(self, file_details: dict, instruction: dict): |
|
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type']) |
|
await self.pfile.insert_one(file_details) |
|
|
|
|
|
|
|
async def get_search_results(self,query=None, file_type=None, max_results=10, offset=0): |
|
|
|
regex = re.compile(re.escape(query), re.IGNORECASE) |
|
filter = {'$or': [{'file.file_name': {"$regex": regex}}, {'file.caption': {"$regex": regex}}, {'title': {"$regex": regex}}, {'release_date': {"$regex": regex}}]} |
|
|
|
if file_type: |
|
filter['mime_type'] = file_type |
|
|
|
total_results = await self.files.count_documents(filter) |
|
next_offset = offset + max_results |
|
|
|
if next_offset > total_results: |
|
next_offset = '' |
|
|
|
cursor = self.files.find(filter) |
|
|
|
cursor.sort('$natural', -1) |
|
|
|
cursor.skip(offset).limit(max_results) |
|
|
|
files = await cursor.to_list(length=max_results) |
|
return files, next_offset |
|
|
|
|
|
async def total_files(self, id=None): |
|
if id: |
|
return await self.files.count_documents({"user_id": id}) |
|
return await self.files.count_documents({}) |
|
|
|
async def total_privfiles(self, id=None): |
|
if id: |
|
return await self.pfile.count_documents({"user_id": id}) |
|
return await self.pfile.count_documents({}) |
|
|
|
|
|
|
|
async def delete_one_file(self, _id): |
|
await self.files.delete_one({'_id': ObjectId(_id)}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def count_links(self, id, operation: str): |
|
if operation == "-": |
|
await self.users.update_one({"id": id}, {"$inc": {"file.links": -1}}) |
|
elif operation == "+": |
|
await self.users.update_one({"id": id}, {"$inc": {"file.links": 1}}) |
|
|
|
|
|
async def add_webfile(self, upload_info): |
|
fetch_old = await self.get_web_file(upload_info["dropzone_id"]) |
|
if fetch_old: |
|
return fetch_old |
|
else: |
|
await self.web_upload.insert_one(upload_info) |
|
return await self.get_web_file(upload_info["dropzone_id"]) |
|
|
|
|
|
|
|
async def get_web_file(self, upload_id): |
|
file_info = await self.web_upload.find_one({"dropzone_id": upload_id}) |
|
if not file_info: |
|
return None |
|
return file_info |
|
|
|
async def uploaded_web_file(self, upload_id): |
|
await self.web_upload.delete_one({"dropzone_id": upload_id}) |
|
|