import gradio as gr import time import threading import logging from gradio.themes.utils import sizes from main import run_repository_ranking # Import the repository ranking function import agent # Import the test.py module for chat agent # --------------------------- # Global Logging Buffer Setup # --------------------------- LOG_BUFFER = [] LOG_BUFFER_LOCK = threading.Lock() class BufferLogHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) with LOG_BUFFER_LOCK: LOG_BUFFER.append(log_entry) root_logger = logging.getLogger() if not any(isinstance(h, BufferLogHandler) for h in root_logger.handlers): handler = BufferLogHandler() formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) root_logger.addHandler(handler) def filter_logs(logs): filtered = [] last_was_fetching = False for log in logs: if "HTTP Request:" in log: if not last_was_fetching: filtered.append("Fetching repositories...") last_was_fetching = True else: filtered.append(log) last_was_fetching = False return filtered def parse_result_to_html(raw_result: str, num_results: int) -> (str, list): """ Parses the raw string output from run_repository_ranking to an HTML table. Only the top N results are displayed. Returns (html, repo_names) """ entries = raw_result.strip().split("Final Rank:") entries = entries[1:num_results+1] # Use only the first N entries if not entries: return ("

No repositories found for your query.

", []) html = """ """ repo_names = [] for entry in entries: lines = entry.strip().split("\n") data = {} data["Final Rank"] = lines[0].strip() if lines else "" for line in lines[1:]: if ": " in line: key, val = line.split(": ", 1) data[key.strip()] = val.strip() # Try to extract repo name from the Link (github.com/user/repo) link = data.get('Link', '') repo_name = '' if 'github.com/' in link: repo_name = link.split('github.com/')[-1].strip('/ ') if repo_name: repo_names.append(repo_name) html += f""" """ html += "
Rank Title Link Combined Score
{data.get('Final Rank', '')} {data.get('Title', '')} GitHub {data.get('Combined Score', '')}
" return html, repo_names # --------------------------- # GPU-enabled Wrapper for Repository Ranking # --------------------------- def gpu_run_repo(topic: str, num_results: int): return run_repository_ranking(topic, num_results) def run_lite_workflow(topic, num_results, result_container): result = gpu_run_repo(topic, num_results) result_container["raw_result"] = result def stream_lite_workflow(topic, num_results): logging.info("[UI] User started a new search for topic: %s", topic) with LOG_BUFFER_LOCK: LOG_BUFFER.clear() result_container = {} workflow_thread = threading.Thread(target=run_lite_workflow, args=(topic, num_results, result_container)) workflow_thread.start() last_index = 0 while workflow_thread.is_alive() or (last_index < len(LOG_BUFFER)): with LOG_BUFFER_LOCK: new_logs = LOG_BUFFER[last_index:] last_index = len(LOG_BUFFER) if new_logs: filtered_logs = filter_logs(new_logs) status_msg = filtered_logs[-1] detail_msg = "
".join(filtered_logs) yield status_msg, detail_msg, [] time.sleep(0.5) workflow_thread.join() with LOG_BUFFER_LOCK: final_logs = LOG_BUFFER[:] raw_result = result_container.get("raw_result", "No results returned.") html_result, repo_names = parse_result_to_html(raw_result, num_results) yield "", html_result, repo_names def lite_runner(topic, num_results): logging.info("[UI] Running lite_runner for topic: %s", topic) yield "Workflow started", "

Processing your request. Please wait...

", [] for status, details, repos in stream_lite_workflow(topic, num_results): yield status, details, repos # --------------------------- # App UI Setup Using Gradio Soft Theme with Centered Layout # --------------------------- with gr.Blocks( theme=gr.themes.Soft(text_size=sizes.text_md), title="DeepGit Lite", css=""" /* Center header and footer */ #header { text-align: center; margin-bottom: 20px; } #main-container { max-width: 800px; margin: auto; } #footer { text-align: center; margin-top: 20px; } """ ) as demo: gr.Markdown( """

DeepGit Lite

✨ DeepGit Lite is the lightweight pro version of DeepGit.
It harnesses advanced deep semantic search to explore GitHub repositories and deliver curated results.
Under the hood, it leverages a hybrid ranking approach combining dense retrieval, BM25 scoring, and cross-encoder re-ranking for optimal discovery.
If the agent returns no repositories found, it means no chain was invoked due to GPU unavailability. Please duplicate the space and re-run.

🚀 Check out the full DeepGit version on GitHub and ⭐ Star DeepGit on GitHub!

""", elem_id="header" ) # --- Search UI --- with gr.Column(elem_id="main-container", visible=True) as search_ui: research_input = gr.Textbox( label="Research Query", placeholder="Enter your research topic here, e.g., Looking for a low code/no code tool to augment images and annotations?", lines=3 ) num_results_slider = gr.Slider( minimum=5, maximum=25, value=10, step=1, label="Number of Results to Display", info="Choose how many top repositories to show (sorted by score)" ) run_button = gr.Button("Run DeepGit Lite", variant="primary") status_display = gr.Markdown(label="Status") detail_display = gr.HTML(label="Results") repo_state = gr.State([]) go_to_chat_btn = gr.Button("Go to Chat", visible=False) # --- Chat UI --- with gr.Column(visible=False) as chat_ui: repo_choice = gr.Radio(choices=[], label="Select a repository", interactive=True) chat_history = gr.Chatbot(label="Chat with GitHub Agent") user_input = gr.Textbox(label="Your question", placeholder="Ask about the selected repo...e.g., tell me a bit more and guide me to set this up and running?") send_btn = gr.Button("Send") chat_state = gr.State([]) back_btn = gr.Button("Back to Search") def update_chat_button(status, details, repos): logging.info("[UI] Search complete. Showing Go to Chat button: %s", bool(repos)) return gr.update(visible=bool(repos)), repos def show_chat_ui(repos): logging.info("[UI] Switching to Chat UI. Repositories available: %s", repos) return gr.update(visible=False), gr.update(visible=True), gr.update(choices=repos, value=None), [] def back_to_search(): logging.info("[UI] Switching back to Search UI.") return gr.update(visible=True), gr.update(visible=False), gr.update(value=[]), gr.update(value=None), [] def chat_with_agent(user_msg, repo, history): logging.info("[Chat] User sent message: '%s' for repo: '%s'", user_msg, repo) if not user_msg or not user_msg.strip(): # Block blank messages return history + [["", "Please enter a message before sending."]], history if not repo: return history + [[user_msg, "Please select a repository first."]], history full_query = f"[{repo}] {user_msg}" try: result = agent.agent_executor.invoke({"input": full_query}) answer = result["output"] logging.info("[Chat] Agent response received.") except Exception as e: answer = f"Error: {e}" logging.error("[Chat] Error in agent_executor: %s", e) history = history + [[user_msg, answer]] return history, history # Disable send button if no repo is selected or message is blank, and show a helpful message def can_send(user_msg, repo): if not user_msg or not user_msg.strip(): return gr.update(interactive=False, value="Enter a message to send") if not repo: return gr.update(interactive=False, value="Select a repository") return gr.update(interactive=True, value="Send") user_input.change( fn=can_send, inputs=[user_input, repo_choice], outputs=[send_btn], show_progress=False ) repo_choice.change( fn=can_send, inputs=[user_input, repo_choice], outputs=[send_btn], show_progress=False ) run_button.click( fn=lite_runner, inputs=[research_input, num_results_slider], outputs=[status_display, detail_display, repo_state], api_name="deepgit_lite", show_progress=True ).then( fn=update_chat_button, inputs=[status_display, detail_display, repo_state], outputs=[go_to_chat_btn, repo_state] ) research_input.submit( fn=lite_runner, inputs=[research_input, num_results_slider], outputs=[status_display, detail_display, repo_state], api_name="deepgit_lite_submit", show_progress=True ).then( fn=update_chat_button, inputs=[status_display, detail_display, repo_state], outputs=[go_to_chat_btn, repo_state] ) go_to_chat_btn.click( fn=show_chat_ui, inputs=[repo_state], outputs=[search_ui, chat_ui, repo_choice, chat_state] ) back_btn.click( fn=back_to_search, inputs=[], outputs=[search_ui, chat_ui, chat_history, repo_choice, chat_state] ) send_btn.click( fn=chat_with_agent, inputs=[user_input, repo_choice, chat_state], outputs=[chat_history, chat_state], queue=False ) user_input.submit( fn=chat_with_agent, inputs=[user_input, repo_choice, chat_state], outputs=[chat_history, chat_state], queue=False ) gr.HTML( """ """ ) demo.queue(max_size=10).launch()