import time # Record the start time start_time = time.time() from gradio_client import Client as GRCLIENT import sys import discord from discord.ext import commands import json import difflib import os import asyncio import io import requests import re from PIL import Image ## Load pre-trained model and tokenizer #model = BertForQuestionAnswering.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad') #tokenizer = BertTokenizer.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad') # ## Load the content from the file #with open('textfile.txt', 'r') as file: # context = file.read() def setup(): os.environ["DISCORD_BOT_TOKEN"] = "MTA0NTMwODMyMzc2MDcxMzc0OA.GsS5dR.yBjbLrDeCFVX96yG4zzVT_rqw7tOhJm5qPQAzc" os.environ["TRAINING_PASSWORD"] = "train4545" os.environ["HUGGINGFACE_TOKEN"] = "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" print("settuped") setup() global bot_has_replied bot_has_replied = False # Define the intents intents = discord.Intents.default() intents.message_content = True # Initialize a bot instance with intents and a prefix bot = commands.Bot(command_prefix="!", intents=intents) # Load settings from settings.json with open('settings.json', 'r') as settings_file: settings = json.load(settings_file) # Retrieve the values from the loaded settings developer_ids = settings.get("developer_ids", []) # Event to perform when the bot is ready async def read_and_calculate(filename): try: with open(filename, 'r') as file: content = file.read() # Assuming the content of the file is a number (e.g., 377526) number_from_file = len(content) # Perform your calculations here result = number_from_file / 5 # Just an example calculation return result except FileNotFoundError: print(f"File '{filename}' not found.") except ValueError: print(f"Error reading content from '{filename}'. Make sure it contains a valid number.") @bot.event async def on_ready(): print(f'Logged in as {bot.user.name}') # Read and calculate from the text file result = await read_and_calculate('textfile.txt') if result is not None: new_bot_name = f"ChattyBot [{result:.2f} tkn]" # Format the result to 2 decimal places await bot.user.edit(username=new_bot_name) #print(f"Calculated result: {result}. Bot name set to {new_bot_name}") await bot.change_presence(activity=discord.Game(name="!info for help.")) # Start the tasks bot.loop.create_task(print_time_left()) #bot.loop.create_task(send_sleep_message()) async def send_wake_message(): for guild in bot.guilds: for channel in guild.text_channels: if "ai-chat" in channel.name.lower(): await channel.send("I am awake and ready to chat!") # Calculate the time remaining for the countdown def get_time_remaining(): elapsed_time = time.time() - start_time desired_countdown_time = 29 * 60 # 29 minutes in seconds time_remaining = max(0, desired_countdown_time - elapsed_time) return time_remaining # Add a task to send the "going to sleep" message after the remaining time async def send_sleep_message(): time_remaining = get_time_remaining() await asyncio.sleep(time_remaining) await send_sleep_announcement() async def send_sleep_announcement(): for guild in bot.guilds: for channel in guild.text_channels: if "ai-chat" in channel.name.lower(): await channel.send("I'm going to sleep in 1 minute. Bye everyone!") # Task to print the time left every 5 minutes async def print_time_left(): while True: time_remaining = get_time_remaining() print(f"Time left: {time_remaining / 60:.1f} minutes") await asyncio.sleep(5 * 60) # Sleep for 5 minutes @bot.event async def on_disconnect(): await send_sleep_message() @bot.command() async def info(ctx): bot_name = bot.user.name response = f"**Info**\n" response += f"Hello, my name is {bot_name}!\n" response += f"You can use !cmds for a list of commands.\n" response += f"If you want to talk with me, please go to #ai_chat." await ctx.send(response) @bot.command() async def teach(ctx): def check(author): def inner_check(message): return message.author == author and message.channel == ctx.channel return inner_check await ctx.send("Quick info: The teach command creates new intents that can be used to train the bot.") await ctx.send("Quick info: Currently, it's available for everyone to use, but this might be changed if abused.") await ctx.send("Q: Would you like to continue? (yes/no)") try: response = await bot.wait_for('message', check=check(ctx.author), timeout=60.0) if response.content.lower() != 'yes': await ctx.send("Teaching canceled.") return await ctx.send("Q: What topic would you like to teach me?") topic = await bot.wait_for('message', check=check(ctx.author), timeout=60.0) await ctx.send("Q: How many responses would you like me to pick from?") num_responses = await bot.wait_for('message', check=check(ctx.author), timeout=60.0) response_count = int(num_responses.content) if response_count <= 0: await ctx.send("Please specify a valid number of responses.") return responses = [] for i in range(response_count): await ctx.send(f"Q: What would you like me to say for response {i + 1}?") response = await bot.wait_for('message', check=check(ctx.author), timeout=60.0) responses.append(response.content) await ctx.send("Q: How many patterns would you like to provide?") num_patterns = await bot.wait_for('message', check=check(ctx.author), timeout=60.0) pattern_count = int(num_patterns.content) if pattern_count <= 0: await ctx.send("Please specify a valid number of patterns.") return patterns = [] for i in range(pattern_count): await ctx.send(f"Q: Please provide question for what I should say {i + 1}:") pattern = await bot.wait_for('message', check=check(ctx.author), timeout=60.0) patterns.append(pattern.content) await ctx.send("Thank you for teaching me!") # You can use the 'topic', 'responses', and 'patterns' variables to create new intents and train the bot. result = make_intents.make_intent(topic.content, patterns, responses) #await ctx.send(result) except asyncio.TimeoutError: await ctx.send("Teaching session timed out. Please start over if you'd like to continue.") except ValueError: await ctx.send("Invalid input. Please provide valid numbers for responses and patterns.") # Command to train the model @bot.command() async def train_model(ctx): # Check if the provided password is correct password = os.getenv("TRAINING_PASSWORD") if password is None or ctx.message.content != f"!train_model {password}": await ctx.send("Invalid password!") return await ctx.send("Auth: valid password") if ctx.message.author.id not in developer_ids: await ctx.send("You must be a developer to do this.") return await ctx.send("Auth: ai developer :white_check_mark:") trainnltk.train() await ctx.send("Created a new model and trainied it.") # Command to list the current settings @bot.command() async def settings_list(ctx): if ctx.message.author.id not in developer_ids: await ctx.send("You must be a developer to do this.") return response = f"**Current Settings**\n" response += f"Developer IDs: {', '.join(map(str, developer_ids))}" await ctx.send(response) @bot.command() async def cmds(ctx): #commands_list = [] #for index, command in enumerate(bot.commands, start=1): # # Exclude the "help" command from the list # if command.name != 'help': # commands_list.append(f"{index}. {command.name}") #response = f"**Commands List**\n" + "\n".join(commands_list) response = """**Commands List**\n \n!cmds, !info, !dev <>, !restart, !generate_image <>, !ping""" await ctx.send(response) @bot.command() async def ping(ctx): latency = bot.latency * 1000 # Convert to milliseconds await ctx.send(f"Pong! Latency: {latency:.2f}ms") @bot.command() async def restart(ctx): if ctx.message.author.id not in developer_ids: await ctx.send("You must be a developer to do this.") return await ctx.send("Restarting... (developer)") os.execl(sys.executable, sys.executable, *sys.argv) # Command to change a specific setting @bot.command() async def setting(ctx, name, *, new_value): # Declare global variable at the beginning of the function global developer_ids if ctx.message.author.id not in developer_ids: await ctx.send("You must be a developer to do this.") return if name.lower() == 'developer_ids': developer_ids = json.loads(new_value) else: await ctx.send(f"Setting '{name}' does not exist.") await ctx.send(f"Setting '{name}' updated to '{new_value}'.") @bot.command() async def chat(ctx, *, msg): # Declare global variable at the beginning of the function global developer_ids # Check if the user has the developer role or is in the developer_ids list if ctx.message.author.id not in developer_ids: await ctx.send("You must be a developer to use this command.") return msgs = chatbot(msg) # Replace the following line with your desired bot response logic bot_response = f"{msgs}" # Send the bot's response await ctx.send(bot_response) @bot.command() async def dev(ctx, *, message): if ctx.message.author.id not in developer_ids: await ctx.send("You must be a developer to use this command.") return for channel in bot.get_all_channels(): if isinstance(channel, discord.TextChannel) and "ai-chat" in channel.name.lower(): await channel.send(message) print(f'Message sent in server {channel.guild.name} in channel {channel.name}') async def generate_image(text, message): # Generate the image URL image_link = f'https://image.pollinations.ai/prompt/{text}' # Download the image response = requests.get(image_link) image_data = response.content # Send the image as an attachment image_file = discord.File(io.BytesIO(image_data), filename='generated_image.png') # Remove watermark by cropping the image image = Image.open(io.BytesIO(image_data)) width, height = image.size left = 0 top = 0 right = width - 100 # Adjust to remove the watermark bottom = height - 100 # Adjust to remove the watermark cropped_image = image.crop((left, top, right, bottom)) # Save the cropped image cropped_image.save('cropped_image.png') # Send the cropped image as an attachment cropped_file = discord.File('cropped_image.png') await message.reply("Here is your requested image:") await message.reply(file=cropped_file) os.remove('cropped_image.png') # Main chatbot loop chatbot_history = [] @bot.event async def on_message(message): if message.author == bot.user: return # ------------------------------------------------------------------ content = message.content.lower() #if content.startswith("!train_model"): #await bot.process_commands(message) #elif content == "!settings_list": # await bot.process_commands(message) if content == "!cmds": await bot.process_commands(message) elif content == "!info": await bot.process_commands(message) #elif content == "!teach": # await bot.process_commands(message) #elif content.startswith("!chat "): # await bot.process_commands(message) #elif content.startswith("!setting"): # await bot.process_commands(message) elif content == "!ping": await bot.process_commands(message) elif content == "!restart": await bot.process_commands(message) elif message.content.startswith('!generate_image '): text = message.content[len('!generate_image '):] await generate_image(text, message) return elif message.content.startswith('!dev '): await bot.process_commands(message) print("dev cmd") # ------------------------------------------------------------------ # Calculate the Jaccard similarity between the channel name and "ai_chat" channel_name = message.channel.name similarity = difflib.SequenceMatcher(None, channel_name.lower(), "ai_chat").ratio() # Check if the similarity is at least 90% if similarity < 0.8: print(f"Ignored message in channel {channel_name}: {message.content}") return user_input = message.content print(f'Message received in server {message.guild.name} in channel {message.channel.name}: {user_input}') async with message.channel.typing(): pattern = r'image: (.*?)' image_descriptions = re.findall(pattern, user_input) desc_limit = 1 for description in image_descriptions: if desc_limit <= 0: return desc_limit -=1 # Generate the image URL text = message.content image_link = f'https://image.pollinations.ai/prompt/{text}' # Download the image response = requests.get(image_link) image_data = response.content # Send the image as an attachment image_file = discord.File(io.BytesIO(image_data), filename='generated_image.png') # Remove watermark by cropping the image image = Image.open(io.BytesIO(image_data)) width, height = image.size left = 0 top = 0 right = width - 100 # Adjust to remove the watermark bottom = height - 100 # Adjust to remove the watermark cropped_image = image.crop((left, top, right, bottom)) # Save the cropped image cropped_image.save('cropped_image.png') # Send the cropped image as an attachment cropped_file = discord.File('cropped_image.png') await message.reply("Here is your requested image:") await message.reply(file=cropped_file) os.remove('cropped_image.png') print(description) if not image_descriptions: ot = await message.reply("generating..") # Add a reaction to the message await ot.add_reaction("⏳") # You can use any emoji you prefer SYSTEM_PROMPT = "As a generative chatbot (you are not a GPT but your structure is 50% the same), your primary function is to provide helpful and friendly responses to user queries. Feel free to add some personality, but make sure your responses are accurate and helpful. Your owner and developer is: @Costikoooo (Discord user) other developers are unknown. Your name is Chattybot." zephyr_7b_beta = "https://api-inference.huggingface.co/models/HuggingFaceH4/zephyr-7b-beta/" HF_TOKEN = os.getenv("HF_TOKEN") HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} def build_input_prompt(message, chatbot, system_prompt): input_prompt = "\n" + system_prompt + "\n\n" for interaction in chatbot: input_prompt = input_prompt + str(interaction[0]) + "\n\n" + str(interaction[1]) + "\n\n\n" input_prompt = input_prompt + str(message) + "\n" return input_prompt def post_request_beta(payload): response = requests.post(zephyr_7b_beta, headers=HEADERS, json=payload) response.raise_for_status() return response.json() def predict_beta(message, chatbot=[], system_prompt=""): input_prompt = build_input_prompt(message, chatbot, system_prompt) data = { "inputs": input_prompt } try: response_data = post_request_beta(data) json_obj = response_data[0] if 'generated_text' in json_obj and len(json_obj['generated_text']) > 0: bot_message = json_obj['generated_text'] return bot_message elif 'error' in json_obj: raise Exception(json_obj['error'] + ' Please refresh and try again with smaller input prompt') else: warning_msg = f"Unexpected response: {json_obj}" raise Exception(warning_msg) except requests.HTTPError as e: error_msg = f"Request failed with status code {e.response.status_code}" raise Exception(error_msg) except json.JSONDecodeError as e: error_msg = f"Failed to decode response as JSON: {str(e)}" raise Exception(error_msg) #client = GRCLIENT("https://wop-open-gpt-chattybot.hf.space/--replicas/l7cr5/") async def oof(): try: a = predict_beta(message.content, chatbot_history, SYSTEM_PROMPT) return a except Exception as error: return "error" result = await oof() if not result == "error": await ot.add_reaction("✅") chatbot_history.append((message.content, result)) else: await ot.add_reaction("❌") # Split the result into messages if it's more than 2000 characters if len(result) > 2000: # Split the result into chunks of 2000 characters chunks = [result[i:i + 2000] for i in range(0, len(result), 2000)] # Send each chunk as a separate message for chunk in chunks: await message.channel.send(chunk) else: # Edit the original message with the result await ot.edit(content=result) #try: # with torch.no_grad(): # # Tokenize input # tokens = tokenizer.tokenize(message.content + " " + context) # input_ids = tokenizer.convert_tokens_to_ids(tokens) # input_ids = torch.tensor([input_ids]) # # # Get answer start and end scores # outputs = model(input_ids) # answer_start_scores = outputs.start_logits # answer_end_scores = outputs.end_logits # # # Get the most likely beginning and end of the answer # answer_start = torch.argmax(answer_start_scores) # answer_end = torch.argmax(answer_end_scores) + 1 # # # Combine the tokens in the window and append to the final answer # final_answer = ' '.join(tokens[answer_start:answer_end]) # # # Check if the reaction has been added # ot = await ot.channel.fetch_message(ot.id) # reaction = next((r for r in ot.reactions if str(r.emoji) == '🚫'), None) # if reaction and reaction.count > 1: # Reaction pressed by someone other than the bot # # Handle the interruption if needed # pass # # # Sleep for a short duration to avoid blocking the event loop # await asyncio.sleep(0.1) # # # Edit the message with the final answer # await ot.edit(content=final_answer.strip()) # #except asyncio.CancelledError: # # Handle cancellation if needed # pass # ##await sent_message.add_reaction('👍') # Adds the "1" reaction #await sent_message.add_reaction('👎') # Adds the "2" reaction # Reaction detection # def check(reaction, user): # return user == message.author and str(reaction.emoji) in ['👍', '👎'] # try: # reaction, user = await bot.wait_for('reaction_add', timeout=8, check=check) # if str(reaction.emoji) == '👍': # Perform the action when a thumbs up is added # Add your code here # fallback.operate_feedback("good", message.content, sent_message.content) # print("thumbsup") # elif str(reaction.emoji) == '👎': # Perform the action when a thumbs down is added # Add your code here # fallback.operate_feedback("bad", message.content, sent_message.content) # print("thumbsdown") # except asyncio.TimeoutError: # Remove reactions after 1 minute # await sent_message.clear_reactions() # ... if __name__ == "__main__": if os.getenv("DISCORD_BOT_TOKEN") is None: print("Please set the DISCORD_BOT_TOKEN environment variable.") else: #keep_alive.keep_alive() bot.run(os.getenv("DISCORD_BOT_TOKEN"))