fsb / FileStream /Database /database.py
BinaryONe
Image Enhancement
400d8b3
raw
history blame
16.3 kB
import re
import time
import pymongo
import motor.motor_asyncio
from bson.objectid import ObjectId
from bson.errors import InvalidId
from bson.json_util import dumps
#----------------------Local Imports-----------------------#
from FileStream.Exceptions import FileNotFound
from FileStream.Tools import Time_ISTKolNow
from .Elements import UserSchema, NewTG_Files
class Database:
def __init__(self, uri, database_name):
self._client = motor.motor_asyncio.AsyncIOMotorClient(uri)
self.db = self._client[database_name]
self.users = self.db.Users
self.tfiles = self.db.Temp_Files
self.files = self.db.Public_Files
self.pfile = self.db.Private_Files
self.web_upload = self.db.Web_Files
#---------------------[ SCHEMAS ]------------------------------#
#---------------------[ NEW USER ]---------------------#
# ---------------------[ ADD USER ]---------------------#
async def add_user(self, id, details=None):
user = UserSchema(id, details)
await self.users.insert_one(user)
async def add_admin(self, id, details=None):
user= await self.get_user(id)
if user:
await self.users.update_one({"_id": user['_id']}, {"$set": {"access":"ADMIN" }})
else:
user = UserSchema(id,details)
user['access']="ADMIN"
await self.users.insert_one(user)
# ---------------------[ GET USER ]---------------------#
async def get_user(self, id):
user = await self.users.find_one({'telegram_id': int(id)})
return user
# ---------------------[ CHECK USER ]---------------------#
async def total_users_count(self):
count = await self.users.count_documents({})
return count
async def get_all_users(self):
all_users = self.users.find({})
return all_users
async def is_admin(self, user_id):
user = await self.users.find_one({'telegram_id': int(user_id)})
return True if user['access'] == "ADMIN" else False
# ---------------------[ REMOVE USER ]---------------------#
async def delete_user(self, user_id):
await self.users.delete_many({'telegram_id': int(user_id)})
# ---------------------[ BAN, UNBAN USER ]---------------------#
async def ban_user(self, id):
await self.users.update_one({"_id": ObjectId(_id)}, {
"$set": {
"tele_status": {
"status": "BANNED",
"activity": Time_ISTKolNow()
},
}
})
async def unban_user(self, id):
await self.users.update_one({"_id": ObjectId(_id)}, {
"$set": {
"tele_status": {
"status": "ACTIVE",
"activity": Time_ISTKolNow(),
}
}
})
async def is_user_banned(self, id):
if await self.users.find_one({'telegram_id': int(id)}):
return True
else:
return False
# ---------------------[ ADD FILE TO DB ]---------------------#
async def add_file(self, file_info:dict, db_type:str):
file_info["time"] = Time_ISTKolNow()
if db_type =="PUBLIC":
fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"], privacy_type="PUBLIC")
if fetch_old:
return fetch_old["_id"]
await self.count_links(file_info["user_id"], "+")
return (await self.files.insert_one(file_info)).inserted_id
elif db_type=="PRIVATE":
pass
elif db_type == "TEMPORARY":
fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"], privacy_type="TEMPORARY")
if fetch_old:
return fetch_old["_id"]
await self.count_links(file_info["user_id"], "+")
return (await self.tfiles.insert_one(file_info)).inserted_id
else:
return {}
# ---------------------[ ADD FILE TO Temp DB ]---------------------#
async def add_temp_file(self, file_info):
file_info["time"] = Time_ISTKolNow()
return (await self.tfiles.insert_one(file_info)).inserted_id
# ---------------------[ FIND FILE IN DB for Bot and APIs]---------------------#
async def get_file(self, _id):
try:
file_info = await self.files.find_one({"_id": ObjectId(_id)})
if not file_info:
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)})
#raise FileNotFound
#print(file_info)
return file_info
except InvalidId:
raise FileNotFound
async def GetFileByDBName(self, _id, privacy_type:str ):
if privacy_type == "PUBLIC":
try:
file_info = await self.files.find_one({"_id": ObjectId(_id)})
if not file_info:
print('file not found')
#raise FileNotFound
return file_info
except InvalidId:
raise FileNotFound
elif privacy_type == "TEMPORARY":
try:
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)})
if not file_info:
print('file not found')
#raise FileNotFound
return file_info
except InvalidId:
raise FileNotFound
elif privacy_type == "PRIVATE":
try:
file_info = await self.tfiles.find_one({"_id": ObjectId(_id)})
if not file_info:
print('file not found')
#raise FileNotFound
return file_info
except InvalidId:
raise FileNotFound
else:
return None
async def get_all_files_api(self,range=None):
#files = self.files.find({})
files= await self.files.find().to_list(length=None)
#json_result = dumps(cursor)[row for row in files]
print("\n get_all_files_api : Return Type : ", type(files))
return files
async def get_all_files(self,range=None):
user_files = self.files.find({})
if range :
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
return user_files
async def find_files(self, user_id, range):
user_files = self.files.find(
{f"file.tagged_users.{user_id}": {
"$exists": True
}})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents(
{f"file.tagged_users.{user_id}": {
"$exists": True
}})
return user_files, total_files
async def find_all_public_files(self, range):
user_files = self.files.find({"privacy_type": "PUBLIC"})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents({"privacy_type": "PUBLIC"})
return user_files, total_files
async def find_all_files(self, range):
user_files = self.files.find({})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents({})
return user_files, total_files
async def find_private_files(self, user_id, range):
#search_string = "file.tagged_user." + str(user_id)
user_files = self.files.find({f"file.tagged_users.{user_id}": "PRIVATE"})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents(
{"file.tagged_users." + str(user_id): "PRIVATE"})
return user_files, total_files
async def get_file_by_fileuniqueid_only(self, file_unique_id, privacy_type:str):
if privacy_type=="TEMPORARY":
return await self.tfiles.find_one({"file.file_unique_id": file_unique_id})
elif privacy_type=="PUBLIC":
return await self.files.find_one({"file.file_unique_id": file_unique_id})
elif privacy_type=="PRIVATE":
return await self.pfiles.find_one({"file.file_unique_id": file_unique_id})
else:
return {}
async def get_file_by_fileuniqueid(self, id, file_unique_id, privacy_type:str):
if privacy_type=="TEMPORARY":
count = await self.tfiles.count_documents({"user_id":id,"file.file_unique_id":file_unique_id})
if count == 0:
return False
elif count == 1:
return await self.tfiles.find_one({"user_id": id,"file.file_unique_id": file_unique_id})
else:
return self.tfiles.find({"user_id": id,"file.file_unique_id": file_unique_id})
elif privacy_type=="PUBLIC":
count = await self.files.count_documents({"user_id":id,"file.file_unique_id":file_unique_id})
if count == 0:
return False
elif count == 1:
return await self.files.find_one({"user_id": id,"file.file_unique_id": file_unique_id})
else:
return self.files.find({"user_id": id,"file.file_unique_id": file_unique_id})
elif privacy_type=="PRIVATE":
return await self.pfiles.find_one({"file.file_unique_id": file_unique_id})
else:
return {}
# ---------------------[ UPDATE FILE IN DB ]---------------------#
async def update_privacy(self, file_details: dict):
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type'])
# Merge the tagged_user dictionaries
updated_tagged_users = file['file']['tagged_users'].copy()
updated_tagged_users.update(file_details['file']['tagged_users'])
#for value in updated_tagged_users.values():
# if value == "PRIVATE":
# file_details['privacy_type']=="PRIVATE"
if file_details['privacy_type']=="TEMPORARY":
#file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type']
await self.tfiles.update_one({"_id": file['_id']}, {
"$set": {
"privacy_type": file_details['privacy_type'],
"file.tagged_users": updated_tagged_users
}
})
return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type'])
else:
file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type']
await self.files.update_one({"_id": file['_id']}, {
"$set": {
"privacy_type": file_details['privacy_type'],
"file.tagged_users": updated_tagged_users
}
})
return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type'])
async def update_file_ids(self, _id, file_ids: dict, privacy_type:str):
if privacy_type=="PUBLIC":
await self.files.update_one({"_id": ObjectId(_id)},
{"$set": {
"file_ids": file_ids
}})
elif privacy_type=="TEMPORARY":
await self.tfiles.update_one({"_id": ObjectId(_id)},
{"$set": {
"file_ids": file_ids
}})
elif privacy_type=="PRIVATE":
await self.tfiles.update_one({"_id": ObjectId(_id)},
{"$set": {
"file_ids": file_ids
}})
else:
return None
async def update_file_info(self, _id, file_info: dict, privacy_type:str):
if privacy_type=="PUBLIC":
await self.files.update_one({"_id": ObjectId(_id)}, {
"$set": {
"message_id": file_info['message_id'],
"location": file_info['location'],
"file": file_info['file']
}
})
elif privacy_type=="TEMPORARY":
await self.tfiles.update_one({"_id": ObjectId(_id)}, {
"$set": {
"message_id": file_info['message_id'],
"location": file_info['location'],
"file": file_info['file']
}
})
else:
await self.tfiles.update_one({"_id": ObjectId(_id)}, {
"$set": {
"message_id": file_info['message_id'],
"location": file_info['location'],
"file": file_info['file']
}
})
#--------------------------PrivateFiles-------------------
async def get_private_file(self, _id):
try:
file_info = await self.pfile.find_one({"_id": ObjectId(_id)})
if not file_info:
raise FileNotFound
return file_info
except InvalidId:
raise FileNotFound
async def add_private_file(self, file_info):
file_info["time"] = Time_ISTKolNow()
fetch_old = await self.get_private_file_by_fileuniqueid_only(file_info['file']["file_unique_id"])
if fetch_old:
return fetch_old["_id"]
return (await self.pfile.insert_one(file_info))
async def get_private_file_by_fileuniqueid_only(self, file_unique_id):
return await self.pfile.find_one({"file.file_unique_id": file_unique_id})
async def update_private_file_ids(self, _id, file_ids: dict):
await self.pfile.update_one({"_id": ObjectId(_id)},
{"$set": {
"file_ids": file_ids
}})
async def update_private_privacy(self, file_details: dict, instruction: dict):
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'],file_details['privacy_type'])
await self.pfile.insert_one(file_details)
#####################-------search for inline query ------------###############
async def get_search_results(self,query=None, file_type=None, max_results=10, offset=0):
regex = re.compile(re.escape(query), re.IGNORECASE)
filter = {'$or': [{'file.file_name': {"$regex": regex}}, {'file.caption': {"$regex": regex}}, {'title': {"$regex": regex}}, {'release_date': {"$regex": regex}}]}
if file_type:
filter['mime_type'] = file_type
total_results = await self.files.count_documents(filter)
next_offset = offset + max_results
if next_offset > total_results:
next_offset = ''
cursor = self.files.find(filter)
# Sort by recent
cursor.sort('$natural', -1)
# Slice files according to offset and max results
cursor.skip(offset).limit(max_results)
# Get list of files
files = await cursor.to_list(length=max_results)
return files, next_offset
# ---------------------[ TOTAL FILES ]---------------------#
async def total_files(self, id=None):
if id:
return await self.files.count_documents({"user_id": id})
return await self.files.count_documents({})
async def total_privfiles(self, id=None):
if id:
return await self.pfile.count_documents({"user_id": id})
return await self.pfile.count_documents({})
# ---------------------[ DELETE FILES ]---------------------#
async def delete_one_file(self, _id):
await self.files.delete_one({'_id': ObjectId(_id)})
# ---------------------[ PAID SYS ]---------------------#
# async def link_available(self, id):
# user = await self.col.find_one({"id": id})
# if user.get("Plan") == "Plus":
# return "Plus"
# elif user.get("Plan") == "Free":
# files = await self.file.count_documents({"user_id": id})
# if files < 11:
# return True
# return False
async def count_links(self, id, operation: str):
if operation == "-":
await self.users.update_one({"id": id}, {"$inc": {"file.links": -1}})
elif operation == "+":
await self.users.update_one({"id": id}, {"$inc": {"file.links": 1}})
#------------------------------For Web Files -----------------------------------------#
async def add_webfile(self, upload_info):
fetch_old = await self.get_web_file(upload_info["dropzone_id"])
if fetch_old:
return fetch_old
else:
await self.web_upload.insert_one(upload_info)
return await self.get_web_file(upload_info["dropzone_id"])
#async def update_web_file(self, dropzone_id):
async def get_web_file(self, upload_id):
file_info = await self.web_upload.find_one({"dropzone_id": upload_id})
if not file_info:
return None
return file_info
async def uploaded_web_file(self, upload_id):
await self.web_upload.delete_one({"dropzone_id": upload_id})