BinaryONe
Enabled Debugging - APi Changes
a180de7
raw
history blame
13.4 kB
import re
import time
import pymongo
import motor.motor_asyncio
from bson.objectid import ObjectId
from bson.errors import InvalidId
from bson.json_util import dumps
#----------------------Local Imports-----------------------#
from FileStream.server.exceptions import FIleNotFound
from FileStream.Tools import Time_ISTKolNow
class Database:
def __init__(self, uri, database_name):
self._client = motor.motor_asyncio.AsyncIOMotorClient(uri)
self.db = self._client[database_name]
self.users = self.db.Users
self.files = self.db.Public_Files
self.pfile = self.db.Private_Files
self.web_upload = self.db.Web_Files
#---------------------[ SCHEMAS ]------------------------------#
#---------------------[ NEW USER ]---------------------#
def new_user(self, id):
return dict(
telegram_id=id,
access="USER",
tele_status={
"status": "ACTIVE",
"activity": None,
"joined": Time_ISTKolNow()
},
file={
"links": 0,
"private_files": 0,
"public_files": 0,
},
site_id="None",
site_status={
"status": None,
"activity": None,
"password": None,
"links": 0,
"joined": "None"
},
)
#------------------------------------------------
def NewTG_Files(self, details):
return {
"user_id":
details['user_id'] if details['user_id'] else None,
"user_type":
details['user_type'] if details['user_type'] else None,
"message_id":
details['message_id'] if details['message_id'] else None,
"location":
details['location'] if details['location'] else None,
"file": {
"file_id":
details['file']['file_id'] if details['file']['file_id'] else None,
"file_unique_id":
details['file']['file_unique_id']
if details['file']['file_unique_id'] else None,
"file_name":
details['file']['file_name']
if details['file']['file_name'] else None,
"file_size":
details['file']['file_size']
if details['file']['file_size'] else None,
"mime_type":
details['file']['mime_type']
if details['file']['mime_type'] else None,
"taged_users": {}
},
"time":
details['time'] if details['time'] else None,
"privacy_type":
details['privacy_type'] if details['privacy_type'] else None,
}
# ---------------------[ ADD USER ]---------------------#
async def add_user(self, id):
user = self.new_user(id)
await self.users.insert_one(user)
async def add_admin(self, id):
user= await self.get_user(id)
if user:
await self.users.update_one({"_id": user['_id']}, {"$set": {"access":"ADMIN" }})
else:
user = self.new_user(id)
user['access']="ADMIN"
await self.users.insert_one(user)
# ---------------------[ GET USER ]---------------------#
async def get_user(self, id):
user = await self.users.find_one({'telegram_id': int(id)})
return user
# ---------------------[ CHECK USER ]---------------------#
async def total_users_count(self):
count = await self.users.count_documents({})
return count
async def get_all_users(self):
all_users = self.users.find({})
return all_users
async def is_admin(self, user_id):
user = await self.users.find_one({'telegram_id': int(user_id)})
return True if user['access'] == "ADMIN" else False
# ---------------------[ REMOVE USER ]---------------------#
async def delete_user(self, user_id):
await self.users.delete_many({'telegram_id': int(user_id)})
# ---------------------[ BAN, UNBAN USER ]---------------------#
async def ban_user(self, id):
await self.users.update_one({"_id": ObjectId(_id)}, {
"$set": {
"tele_status": {
"status": "BANNED",
"activity": Time_ISTKolNow()
},
}
})
async def unban_user(self, id):
await self.users.update_one({"_id": ObjectId(_id)}, {
"$set": {
"tele_status": {
"status": "ACTIVE",
"activity": Time_ISTKolNow(),
}
}
})
async def is_user_banned(self, id):
if await self.users.find_one({'telegram_id': int(id)}):
return True
else:
return False
# ---------------------[ ADD FILE TO DB ]---------------------#
async def add_file(self, file_info):
file_info["time"] = Time_ISTKolNow()
fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"])
if fetch_old:
return fetch_old["_id"]
await self.count_links(file_info["user_id"], "+")
return (await self.files.insert_one(file_info)).inserted_id
# ---------------------[ FIND FILE IN DB for Bot and APIs]---------------------#
async def get_file(self, _id):
try:
file_info = await self.files.find_one({"_id": ObjectId(_id)})
if not file_info:
print('file not found')
#raise FIleNotFound
return file_info
except InvalidId:
raise FIleNotFound
async def get_all_files_api(self,range=None):
#files = self.files.find({})
files= await self.files.find().to_list(length=None)
#json_result = dumps(cursor)[row for row in files]
print("\n get_all_files_api : Return Type : ", type(files))
return files
async def get_all_files(self,range=None):
user_files = self.files.find({})
if range :
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
return user_files
async def find_files(self, user_id, range):
user_files = self.files.find(
{f"file.tagged_users.{user_id}": {
"$exists": True
}})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents(
{f"file.tagged_users.{user_id}": {
"$exists": True
}})
return user_files, total_files
async def find_all_public_files(self, range):
user_files = self.files.find({"privacy_type": "PUBLIC"})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents({"privacy_type": "PUBLIC"})
return user_files, total_files
async def find_all_files(self, range):
user_files = self.files.find({})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents({})
return user_files, total_files
async def find_private_files(self, user_id, range):
#search_string = "file.tagged_user." + str(user_id)
user_files = self.files.find({f"file.tagged_users.{user_id}": "PRIVATE"})
user_files.skip(range[0] - 1)
user_files.limit(range[1] - range[0] + 1)
user_files.sort('_id', pymongo.DESCENDING)
total_files = await self.files.count_documents(
{"file.tagged_users." + str(user_id): "PRIVATE"})
return user_files, total_files
async def get_file_by_fileuniqueid_only(self, file_unique_id):
return await self.files.find_one({"file.file_unique_id": file_unique_id})
async def get_file_by_fileuniqueid(self, id, file_unique_id):
count = await self.files.count_documents({"user_id":id,"file.file_unique_id":file_unique_id})
if count == 0:
return False
elif count == 1:
return await self.files.find_one({"user_id": id,"file.file_unique_id": file_unique_id})
else:
return self.files.find({"user_id": id,"file.file_unique_id": file_unique_id})
# ---------------------[ UPDATE FILE IN DB ]---------------------#
async def update_privacy(self, file_details: dict):
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'])
# Merge the tagged_user dictionaries
updated_tagged_users = file['file']['tagged_users'].copy()
updated_tagged_users.update(file_details['file']['tagged_users'])
#for value in updated_tagged_users.values():
# if value == "PRIVATE":
# file_details['privacy_type']=="PRIVATE"
file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type']
await self.files.update_one({"_id": file['_id']}, {
"$set": {
"privacy_type": file_details['privacy_type'],
"file.tagged_users": updated_tagged_users
}
})
return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'])
async def update_file_ids(self, _id, file_ids: dict):
await self.files.update_one({"_id": ObjectId(_id)},
{"$set": {
"file_ids": file_ids
}})
async def update_file_info(self, _id, file_info: dict):
await self.files.update_one({"_id": ObjectId(_id)}, {
"$set": {
"message_id": file_info['message_id'],
"location": file_info['location'],
"file": file_info['file']
}
})
#--------------------------PrivateFiles-------------------
async def get_private_file(self, _id):
try:
file_info = await self.pfile.find_one({"_id": ObjectId(_id)})
if not file_info:
raise FIleNotFound
return file_info
except InvalidId:
raise FIleNotFound
async def add_private_file(self, file_info):
file_info["time"] = Time_ISTKolNow()
fetch_old = await self.get_private_file_by_fileuniqueid_only(file_info['file']["file_unique_id"])
if fetch_old:
return fetch_old["_id"]
return (await self.pfile.insert_one(file_info))
async def get_private_file_by_fileuniqueid_only(self, file_unique_id):
return await self.pfile.find_one({"file.file_unique_id": file_unique_id})
async def update_private_file_ids(self, _id, file_ids: dict):
await self.pfile.update_one({"_id": ObjectId(_id)},
{"$set": {
"file_ids": file_ids
}})
async def update_private_privacy(self, file_details: dict, instruction: dict):
file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id'])
await self.pfile.insert_one(file_details)
#####################-------search for inline query ------------###############
async def get_search_results(self,query=None, file_type=None, max_results=10, offset=0):
regex = re.compile(re.escape(query), re.IGNORECASE)
filter = {'$or': [{'file.file_name': {"$regex": regex}}, {'file.caption': {"$regex": regex}}]}
if file_type:
filter['mime_type'] = file_type
total_results = await self.files.count_documents(filter)
next_offset = offset + max_results
if next_offset > total_results:
next_offset = ''
cursor = self.files.find(filter)
# Sort by recent
cursor.sort('$natural', -1)
# Slice files according to offset and max results
cursor.skip(offset).limit(max_results)
# Get list of files
files = await cursor.to_list(length=max_results)
return files, next_offset
# ---------------------[ TOTAL FILES ]---------------------#
async def total_files(self, id=None):
if id:
return await self.files.count_documents({"user_id": id})
return await self.files.count_documents({})
async def total_privfiles(self, id=None):
if id:
return await self.pfile.count_documents({"user_id": id})
return await self.pfile.count_documents({})
# ---------------------[ DELETE FILES ]---------------------#
async def delete_one_file(self, _id):
await self.files.delete_one({'_id': ObjectId(_id)})
# ---------------------[ PAID SYS ]---------------------#
# async def link_available(self, id):
# user = await self.col.find_one({"id": id})
# if user.get("Plan") == "Plus":
# return "Plus"
# elif user.get("Plan") == "Free":
# files = await self.file.count_documents({"user_id": id})
# if files < 11:
# return True
# return False
async def count_links(self, id, operation: str):
if operation == "-":
await self.users.update_one({"id": id}, {"$inc": {"file.links": -1}})
elif operation == "+":
await self.users.update_one({"id": id}, {"$inc": {"file.links": 1}})
#------------------------------For Web Files -----------------------------------------#
async def add_webfile(self, upload_info):
fetch_old = await self.get_web_file(upload_info["dropzone_id"])
if fetch_old:
return fetch_old
else:
await self.web_upload.insert_one(upload_info)
return await self.get_web_file(upload_info["dropzone_id"])
#async def update_web_file(self, dropzone_id):
async def get_web_file(self, upload_id):
file_info = await self.web_upload.find_one({"dropzone_id": upload_id})
if not file_info:
return None
return file_info
async def uploaded_web_file(self, upload_id):
await self.web_upload.delete_one({"dropzone_id": upload_id})