Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import gradio as gr | |
| import zipfile | |
| import tempfile | |
| import requests | |
| import urllib.parse | |
| import io | |
| from huggingface_hub import HfApi, login | |
| from PyPDF2 import PdfReader | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_groq import ChatGroq | |
| from dotenv import load_dotenv | |
| from langchain.docstore.document import Document | |
| from langchain.schema import Document | |
| from chunk_python_code import chunk_python_code_with_metadata | |
| from vectorstore import get_chroma_vectorstore | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Load configuration from JSON file | |
| with open('config.json') as config_file: | |
| config = json.load(config_file) | |
| with open("config2.json", "r") as file: | |
| config2 = json.load(file) | |
| PERSIST_DOC_DIRECTORY = config["persist_doc_directory"] | |
| PERSIST_CODE_DIRECTORY =config["persist_code_directory"] | |
| CHUNK_SIZE = config["chunk_size"] | |
| CHUNK_OVERLAP = config["chunk_overlap"] | |
| EMBEDDING_MODEL_NAME = config["embedding_model"] | |
| LLM_MODEL_NAME = config["llm_model"] | |
| LLM_TEMPERATURE = config["llm_temperature"] | |
| GITLAB_API_URL = config["gitlab_api_url"] | |
| HF_SPACE_NAME = config["hf_space_name"] | |
| DATA_DIR = config["data_dir"] | |
| GROQ_API_KEY = os.environ["GROQ_API_KEY"] | |
| HF_TOKEN = os.environ["HF_Token"] | |
| login(HF_TOKEN) | |
| api = HfApi() | |
| def load_project_id(json_file): | |
| with open(json_file, 'r') as f: | |
| data = json.load(f) | |
| return data['project_id'] | |
| def download_gitlab_project_by_version(): | |
| try: | |
| # Load the configuration from config.json | |
| # Extract GitLab project information from the config | |
| api_url = config2['gitlab']['api_url'] | |
| project_id = urllib.parse.quote(config2['gitlab']['project']['id'], safe="") | |
| version = config2['gitlab']['project']['version'] | |
| # Construct the URL for the release's zip file | |
| url = f"{api_url}/projects/{project_id}/repository/archive.zip?sha={version}" | |
| # Send GET request to download the zip file | |
| response = requests.get(url, stream=True) | |
| archive_bytes = io.BytesIO(response.content) | |
| if response.status_code == 200: | |
| # Extract filename from content-disposition header | |
| content_disposition = response.headers.get("content-disposition") | |
| if content_disposition and "filename=" in content_disposition: | |
| filename = content_disposition.split("filename=")[-1].strip('"') | |
| # test | |
| # target_path = f"{DATA_DIR}/{filename}" | |
| # Check if the request was successful | |
| if response.status_code == 200: | |
| api.upload_file( | |
| path_or_fileobj= archive_bytes, | |
| path_in_repo= f"{DATA_DIR}/{filename}", | |
| repo_id=HF_SPACE_NAME, | |
| repo_type='space' | |
| ) | |
| print(f"Release {version} downloaded successfully as {file_path}.") | |
| else: | |
| print(f"Failed to download the release: {response.status_code} - {response.reason}") | |
| print(response.text) | |
| except FileNotFoundError: | |
| print("The config.json file was not found. Please ensure it exists in the project directory.") | |
| except json.JSONDecodeError: | |
| print("Failed to parse the config.json file. Please ensure it contains valid JSON.") | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| def download_gitlab_repo(): | |
| print("Start the upload_gitRepository function") | |
| project_id = load_project_id('repository_ids.json') | |
| encoded_project_id = urllib.parse.quote_plus(project_id) | |
| # Define the URL to download the repository archive | |
| archive_url = f"{GITLAB_API_URL}/projects/{encoded_project_id}/repository/archive.zip" | |
| # Download the repository archive | |
| response = requests.get(archive_url) | |
| archive_bytes = io.BytesIO(response.content) | |
| # Retrieve the original file name from the response headers | |
| content_disposition = response.headers.get('content-disposition') | |
| if content_disposition: | |
| filename = content_disposition.split('filename=')[-1].strip('\"') | |
| else: | |
| filename = 'archive.zip' # Fallback to a default name if not found | |
| # Check if the file already exists in the repository | |
| existing_files = api.list_repo_files(repo_id=HF_SPACE_NAME, repo_type='space') | |
| target_path = f"{DATA_DIR}/{filename}" | |
| print(f"Target Path: '{target_path}'") | |
| print(f"Existing Files: {[repr(file) for file in existing_files]}") | |
| if target_path in existing_files: | |
| print(f"File '{target_path}' already exists in the repository. Skipping upload...") | |
| else: | |
| # Upload the ZIP file to the new folder in the Hugging Face space repository | |
| print("Uploading File to directory:") | |
| print(f"Archive Bytes: {repr(archive_bytes.getvalue())[:100]}") # Show a preview of bytes | |
| print(f"Target Path in Repo: '{target_path}'") | |
| api.upload_file( | |
| path_or_fileobj=archive_bytes, | |
| path_in_repo=target_path, | |
| repo_id=HF_SPACE_NAME, | |
| repo_type='space' | |
| ) | |
| print("Upload complete") | |
| def get_all_files_in_folder(temp_dir, folder_path): | |
| all_files = [] | |
| target_dir = os.path.join(temp_dir, folder_path) | |
| for root, dirs, files in os.walk(target_dir): | |
| print(f"Files in current directory ({root}): {files}") | |
| for file in files: | |
| print(f"Processing file: {file}") | |
| all_files.append(os.path.join(root, file)) | |
| return all_files | |
| def get_file(temp_dir, file_path): | |
| full_path = os.path.join(temp_dir, file_path) | |
| return full_path | |
| def process_directory(directory, folder_paths, file_paths): | |
| all_texts = [] | |
| file_references = [] | |
| zip_filename = next((file for file in os.listdir(directory) if file.endswith('.zip')), None) | |
| zip_file_path = os.path.join(directory, zip_filename) | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| # Unzip the file into the temporary directory | |
| with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
| zip_ref.extractall(tmpdirname) | |
| files = [] | |
| print("tmpdirname: " , tmpdirname) | |
| unzipped_root = os.listdir(tmpdirname) | |
| print("unzipped_root ", unzipped_root) | |
| tmpsubdirpath= os.path.join(tmpdirname, unzipped_root[0]) | |
| print("tempsubdirpath: ", tmpsubdirpath) | |
| if folder_paths: | |
| for folder_path in folder_paths: | |
| files += get_all_files_in_folder(tmpsubdirpath, folder_path) | |
| if file_paths: | |
| files += [get_file(tmpsubdirpath, file_path) for file_path in file_paths] | |
| print(f"Total number of files: {len(files)}") | |
| for file_path in files: | |
| # print("111111111:", file_path) | |
| file_ext = os.path.splitext(file_path)[1] | |
| # print("222222222:", file_ext) | |
| if os.path.getsize(file_path) == 0: | |
| print(f"Skipping an empty file: {file_path}") | |
| continue | |
| with open(file_path, 'rb') as f: | |
| if file_ext in ['.rst', '.py']: | |
| text = f.read().decode('utf-8') | |
| all_texts.append(text) | |
| print("Filepaths brother:", file_path) | |
| relative_path = os.path.relpath(file_path, tmpsubdirpath) | |
| print("Relative Filepaths brother:", relative_path) | |
| file_references.append(relative_path) | |
| return all_texts, file_references | |
| def split_python_code_into_chunks(texts, file_paths): | |
| chunks = [] | |
| for text, file_path in zip(texts, file_paths): | |
| document_chunks = chunk_python_code_with_metadata(text, file_path) | |
| chunks.extend(document_chunks) | |
| return chunks | |
| # Split text into chunks | |
| def split_into_chunks(texts, references, chunk_size, chunk_overlap): | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| chunks = [] | |
| for text, reference in zip(texts, references): | |
| chunks.extend([ | |
| Document( | |
| page_content=chunk, | |
| metadata={ | |
| "source": reference, | |
| "usage": "doc" | |
| } | |
| ) | |
| for chunk in text_splitter.split_text(text) | |
| ]) | |
| return chunks | |
| # Setup Vectorstore | |
| def embed_documents_into_vectorstore(chunks, model_name, persist_directory): | |
| print("Start setup_vectorstore_function") | |
| embedding_model = HuggingFaceEmbeddings(model_name=model_name) | |
| vectorstore = get_chroma_vectorstore(embedding_model, persist_directory) | |
| vectorstore.add_documents(chunks) | |
| return vectorstore | |
| # Setup LLM | |
| def setup_llm(model_name, temperature, api_key): | |
| llm = ChatGroq(model=model_name, temperature=temperature, api_key=api_key) | |
| return llm | |
| def format_kadi_apy_library_context(docs): | |
| doc_context = [] | |
| for doc in docs: | |
| # Extract metadata information | |
| class_info = doc.metadata.get("class", "Unknown Class") | |
| type_info = doc.metadata.get("type", "Unknown Type") | |
| source_info = doc.metadata.get("source", "Unknown Type") | |
| # Format metadata and document content | |
| print("YYYYYYYEEEEEEEEEEEEEEE222222222222222222222222222222:}\n\n", doc.page_content) | |
| formatted_doc = f"# source: {source_info}\n# class: {class_info}\n# type: {type_info}\n{doc.page_content}\n\n\n" | |
| doc_context.append(formatted_doc) | |
| return doc_context | |
| def format_kadi_api_doc_context(docs): | |
| doc_context = [] | |
| for doc in docs: | |
| source_info = doc.metadata.get("source", "Unknown Type") | |
| print("YYYYYYYEEEEEEEEEEEEEEE:}\n\n", doc.page_content) | |
| formatted_doc = f"# source: {source_info}\n{doc.page_content}\n\n\n" | |
| doc_context.append(formatted_doc) | |
| return doc_context | |
| def rag_workflow(query): | |
| prompt = ( | |
| f"""The query is: '{query}'. | |
| Based on the user's query, assist them by determining which technical document they should read to interact with the software named 'Kadi4Mat'. | |
| There are three different technical documents to choose from: | |
| - Document 1: Provides information on how to use a Python library to interact with the HTTP API of 'Kadi4Mat'. | |
| - Document 2: Provides information on how to use a Python library to implement custom CLI commands to interact with 'Kadi4Mat'. | |
| Your task is to select the single most likely option. | |
| If Document 1 is the best choice, respond with 'kadi-apy python library'. | |
| If Document 2 is the best choice, respond with 'kadi-apy python cli library'. | |
| Respond with only the exact corresponding option and do not include any additional comments, explanations, or text." | |
| """ | |
| ) | |
| library_usage_prediction = llm.predict(prompt) | |
| print("METADATA PREDICTION -------------------------:", metadata_prediction) | |
| print(metadata_prediction) | |
| rewrite_prompt = ( | |
| f"""You are an intelligent assistant that helps users rewrite their queries. | |
| The vectorstore consists of the source code and documentation of a Python library, which enables users to | |
| programmatically interact with a REST-like API of a software system. The library methods have descriptive | |
| docstrings. Your task is to rewrite the query in a way that aligns with the language and structure of the | |
| library's methods and documentation, ensuring optimal retrieval of relevant information. | |
| Guidelines for rewriting the query: | |
| 1. Identify the main action the user wants to perform (e.g., "Upload a file to a record," "Get users of a group"). | |
| 2. Remove conversational elements like greetings or pleasantries (e.g., "Hello Chatbot", "I need you to help me with"). | |
| 3. Exclude specific variable values (e.g., "ID of my record is '31'") unless essential to the intent. | |
| 4. Rephrase the query to match the format and keywords used in the docstrings, focusing on verbs and objects relevant to the action (e.g., "Add a record to a collection"). | |
| 5. Given the query the user might need more than one action to achieve his goal. In this case the rewritten query has more than one action. | |
| Examples: | |
| - User query: "Create a Python script with a method that facilitates the creation of records. This method should accept an array of identifiers as a parameter and allow metadata to be added to each record." | |
| - Rewritten query: "create records, add metadata to record" | |
| - User query: "Hi, can you help me write Python code to add a record to a collection? The record ID is '45', and the collection ID is '12'." | |
| Rewritten query: "add a record to a collection" | |
| - User query: I need a python script with which i create a new record with the title: "Hello World" and then link the record to a given collection. | |
| Rewritten query: "create a new record with title" , "link a record to a collection" | |
| Based on these examples and guidelines, rewrite the following user query to align more effectively with the keywords used in the docstrings. | |
| Do not include any addition comments, explanations, or text. | |
| Original query: | |
| {query} | |
| """ | |
| ) | |
| rewritten_query_response = llm.invoke(rewrite_prompt) | |
| rewritten_query = rewritten_query_response.content.strip() | |
| print("A", metadata_prediction) | |
| print(rewritten_query) | |
| kadi_apy_docs = vector_store.similarity_search(query, k=5, filter={"usage": "doc"}) | |
| kadi_apy_docs = vector_store.similarity_search(query, k=5, filter={"usage": library_usage_prediction}) | |
| doc_context = format_kadi_api_doc_context(kadi_apy_docs) | |
| code_context = format_kadi_apy_library_context(kadi_apy_sourcecode) | |
| print("HERE WE GHOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO") | |
| print("::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::") | |
| for doc in kadi_apy_sourcecode: | |
| print(doc.metadata.get("source", "Unknown Type")) | |
| print("\n") | |
| print("::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::") | |
| prompt = f"""You are an expert python developer. You are assisting in generating code for users who wants to make use of "kadi-apy", an API library. | |
| "Doc-context:" provides you with information how to use this API library by givnig code examples and code documentation. | |
| "Code-context:" provides you information of API methods and classes from the "kadi-apy" library. | |
| Based on the retrieved contexts and the guidelines answer the query. | |
| General Guidelines: | |
| - If no related information is found from the contexts to answer the query, reply that you do not know. | |
| Guidelines when generating code: | |
| - First display the full code and then follow with a well structured explanation of the generated code. | |
| Doc-context: | |
| {doc_context} | |
| Code-context: | |
| {code_context} | |
| Query: | |
| {query} | |
| """ | |
| response = llm.invoke(prompt) | |
| return response.content | |
| def initialize(): | |
| global vector_store, chunks, llm | |
| download_gitlab_project_by_version() | |
| code_folder_paths = ['kadi_apy'] | |
| doc_folder_paths = ['docs/source/'] | |
| code_texts, code_references = process_directory(DATA_DIR, code_folder_paths, []) | |
| print("LEEEEEEEEEEEENGTH of code_texts: ", len(code_texts)) | |
| doc_texts, kadiAPY_doc_references = process_directory(DATA_DIR, doc_folder_paths, []) | |
| print("LEEEEEEEEEEEENGTH of doc_files: ", len(doc_texts)) | |
| code_chunks = split_python_code_into_chunks(code_texts, code_references) | |
| doc_chunks = split_into_chunks(doc_texts, kadiAPY_doc_references, CHUNK_SIZE, CHUNK_OVERLAP) | |
| print(f"Total number of code_chunks: {len(code_chunks)}") | |
| print(f"Total number of doc_chunks: {len(doc_chunks)}") | |
| #docstore = embed_documents_into_vectorstore(kadiAPY_code_chunks, EMBEDDING_MODEL_NAME, PERSIST_DOC_DIRECTORY) | |
| #codestore = embed_documents_into_vectorstore(kadiAPY_doc_chunks, EMBEDDING_MODEL_NAME, PERSIST_CODE_DIRECTORY) | |
| vector_store = embed_documents_into_vectorstore( | |
| chunks= doc_chunks + code_chunks, | |
| model_name= EMBEDDING_MODEL_NAME, | |
| persist_directory= PERSIST_DOC_DIRECTORY | |
| ) | |
| llm = setup_llm(LLM_MODEL_NAME, LLM_TEMPERATURE, GROQ_API_KEY) | |
| initialize() | |
| # Gradio utils | |
| def check_input_text(text): | |
| if not text: | |
| gr.Warning("Please input a question.") | |
| raise TypeError | |
| return True | |
| def add_text(history, text): | |
| history = history + [(text, None)] | |
| yield history, "" | |
| import gradio as gr | |
| def bot_kadi(history): | |
| user_query = history[-1][0] | |
| response = rag_workflow(user_query) | |
| history[-1] = (user_query, response) | |
| yield history | |
| def main(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Kadi4Mat - AI Chat-Bot") | |
| gr.Markdown("AI assistant for Kadi4Mat based on RAG architecture powered by LLM") | |
| with gr.Tab("Kadi4Mat - AI Assistant"): | |
| with gr.Row(): | |
| with gr.Column(scale=10): | |
| chatbot = gr.Chatbot([], elem_id="chatbot", label="Kadi Bot", bubble_full_width=False, show_copy_button=True, height=600) | |
| user_txt = gr.Textbox(label="Question", placeholder="Type in your question and press Enter or click Submit") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| submit_btn = gr.Button("Submit", variant="primary") | |
| with gr.Column(scale=1): | |
| clear_btn = gr.Button("Clear", variant="stop") | |
| gr.Examples( | |
| examples=[ | |
| "Who is working on Kadi4Mat?", | |
| "How do i install the Kadi-Apy library?", | |
| "How do i install the Kadi-Apy library for development?", | |
| "I need a method to upload a file to a record", | |
| ], | |
| inputs=user_txt, | |
| outputs=chatbot, | |
| fn=add_text, | |
| label="Try asking...", | |
| cache_examples=False, | |
| examples_per_page=3, | |
| ) | |
| user_txt.submit(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot]) | |
| submit_btn.click(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot]) | |
| #user_txt.submit(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot, doc_citation]) | |
| #submit_btn.click(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot, doc_citation]) | |
| clear_btn.click(lambda: None, None, chatbot, queue=False) | |
| demo.launch() | |
| if __name__ == "__main__": | |
| main() |