Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import os | |
| from itertools import chain | |
| import time | |
| from resources.data import fixed_messages, topic_lists, interview_types | |
| from utils.ui import add_candidate_message, add_interviewer_message | |
| from typing import List, Dict, Generator, Optional, Tuple | |
| from functools import partial | |
| from api.llm import LLMManager | |
| from api.audio import TTSManager, STTManager | |
| from docs.instruction import instruction | |
| def send_request( | |
| code: str, | |
| previous_code: str, | |
| chat_history: List[Dict[str, str]], | |
| chat_display: List[List[Optional[str]]], | |
| llm: LLMManager, | |
| tts: Optional[TTSManager], | |
| silent: Optional[bool] = False, | |
| ) -> Generator[Tuple[List[Dict[str, str]], List[List[Optional[str]]], str, bytes], None, None]: | |
| """ | |
| Send a request to the LLM and update the chat display and translate it to speech. | |
| """ | |
| # TODO: Find the way to simplify it and remove duplication in logic | |
| if silent is None: | |
| silent = os.getenv("SILENT", False) | |
| if chat_display[-1][0] is None and code == previous_code: | |
| yield chat_history, chat_display, code, b"" | |
| return | |
| chat_history = llm.update_chat_history(code, previous_code, chat_history, chat_display) | |
| original_len = len(chat_display) | |
| chat_display.append([None, ""]) | |
| text_chunks = [] | |
| reply = llm.get_text(chat_history) | |
| chat_history.append({"role": "assistant", "content": ""}) | |
| audio_generator = iter(()) | |
| has_text_item = True | |
| has_audio_item = not silent | |
| audio_created = 0 | |
| is_notes = False | |
| while has_text_item or has_audio_item: | |
| try: | |
| text_chunk = next(reply) | |
| text_chunks.append(text_chunk) | |
| has_text_item = True | |
| except StopIteration: | |
| has_text_item = False | |
| chat_history[-1]["content"] = "".join(text_chunks) | |
| if silent: | |
| audio_chunk = b"" | |
| else: | |
| try: | |
| audio_chunk = next(audio_generator) | |
| has_audio_item = True | |
| except StopIteration: | |
| audio_chunk = b"" | |
| has_audio_item = False | |
| if has_text_item and not is_notes: | |
| last_message = chat_display[-1][1] | |
| last_message += text_chunk | |
| split_notes = last_message.split("#NOTES#") | |
| if len(split_notes) > 1: | |
| is_notes = True | |
| last_message = split_notes[0] | |
| split_messages = last_message.split("\n\n") | |
| chat_display[-1][1] = split_messages[0] | |
| for m in split_messages[1:]: | |
| chat_display.append([None, m]) | |
| if not silent: | |
| if len(chat_display) - original_len > audio_created + has_text_item: | |
| audio_generator = chain(audio_generator, tts.read_text(chat_display[original_len + audio_created][1])) | |
| audio_created += 1 | |
| has_audio_item = True | |
| yield chat_history, chat_display, code, audio_chunk | |
| if chat_display and len(chat_display) > 1 and chat_display[-1][1] == "" and chat_display[-2][1]: | |
| chat_display.pop() | |
| yield chat_history, chat_display, code, b"" | |
| def change_code_area(interview_type): | |
| if interview_type == "coding": | |
| return gr.update( | |
| label="Please write your code here. You can use any language, but only Python syntax highlighting is available.", | |
| language="python", | |
| ) | |
| elif interview_type == "sql": | |
| return gr.update( | |
| label="Please write your query here.", | |
| language="sql", | |
| ) | |
| else: | |
| return gr.update( | |
| label="Please write any notes for your solution here.", | |
| language=None, | |
| ) | |
| def get_problem_solving_ui(llm: LLMManager, tts: TTSManager, stt: STTManager, default_audio_params: Dict, audio_output): | |
| send_request_partial = partial(send_request, llm=llm, tts=tts) | |
| with gr.Tab("Interview", render=False, elem_id=f"tab") as problem_tab: | |
| if os.getenv("IS_DEMO"): | |
| gr.Markdown(instruction["demo"]) | |
| chat_history = gr.State([]) | |
| previous_code = gr.State("") | |
| hi_markdown = gr.Markdown( | |
| "<h2 style='text-align: center;'> Hi! I'm here to guide you through a practice session for your technical interview. Choose the interview settings to begin.</h2>\n" | |
| ) | |
| with gr.Row() as init_acc: | |
| with gr.Column(scale=3): | |
| interview_type_select = gr.Dropdown( | |
| show_label=False, | |
| info="Type of the interview.", | |
| choices=interview_types, | |
| value="coding", | |
| container=True, | |
| allow_custom_value=False, | |
| elem_id=f"interview_type_select", | |
| scale=2, | |
| ) | |
| difficulty_select = gr.Dropdown( | |
| show_label=False, | |
| info="Difficulty of the problem.", | |
| choices=["Easy", "Medium", "Hard"], | |
| value="Medium", | |
| container=True, | |
| allow_custom_value=True, | |
| elem_id=f"difficulty_select", | |
| scale=2, | |
| ) | |
| topic_select = gr.Dropdown( | |
| show_label=False, | |
| info="Topic (you can type any value).", | |
| choices=topic_lists[interview_type_select.value], | |
| value=np.random.choice(topic_lists[interview_type_select.value]), | |
| container=True, | |
| allow_custom_value=True, | |
| elem_id=f"topic_select", | |
| scale=2, | |
| ) | |
| with gr.Column(scale=4): | |
| requirements = gr.Textbox( | |
| label="Requirements", | |
| show_label=False, | |
| placeholder="Specify additional requirements if any.", | |
| container=False, | |
| lines=5, | |
| elem_id=f"requirements", | |
| ) | |
| with gr.Row(): | |
| terms_checkbox = gr.Checkbox( | |
| label="", | |
| container=False, | |
| value=not os.getenv("IS_DEMO", False), | |
| interactive=True, | |
| elem_id=f"terms_checkbox", | |
| min_width=20, | |
| ) | |
| with gr.Column(scale=100): | |
| gr.Markdown( | |
| "#### I agree to the [terms and conditions](https://github.com/IliaLarchenko/Interviewer?tab=readme-ov-file#important-legal-and-compliance-information)" | |
| ) | |
| start_btn = gr.Button("Generate a problem", elem_id=f"start_btn", interactive=not os.getenv("IS_DEMO", False)) | |
| with gr.Accordion("Problem statement", open=True, visible=False) as problem_acc: | |
| description = gr.Markdown(elem_id=f"problem_description", line_breaks=True) | |
| with gr.Accordion("Solution", open=True, visible=False) as solution_acc: | |
| with gr.Row() as content: | |
| with gr.Column(scale=2): | |
| code = gr.Code( | |
| label="Please write your code here.", | |
| language="python", | |
| lines=46, | |
| elem_id=f"code", | |
| ) | |
| with gr.Column(scale=1): | |
| end_btn = gr.Button("Finish the interview", interactive=False, variant="stop", elem_id=f"end_btn") | |
| chat = gr.Chatbot(label="Chat", show_label=False, show_share_button=False, elem_id=f"chat") | |
| # I need this message box only because chat component is flickering when I am updating it | |
| # To be improved in the future | |
| message = gr.Textbox( | |
| label="Message", | |
| show_label=False, | |
| lines=5, | |
| max_lines=5, | |
| interactive=False, | |
| container=False, | |
| elem_id=f"message", | |
| ) | |
| audio_input = gr.Audio(interactive=False, **default_audio_params, elem_id=f"audio_input") | |
| audio_buffer = gr.State(np.array([], dtype=np.int16)) | |
| audio_to_transcribe = gr.State(np.array([], dtype=np.int16)) | |
| with gr.Accordion("Feedback", open=True, visible=False) as feedback_acc: | |
| feedback = gr.Markdown(elem_id=f"feedback", line_breaks=True) | |
| # Start button click action chain | |
| start_btn.click(fn=add_interviewer_message(fixed_messages["start"]), inputs=[chat], outputs=[chat]).success( | |
| fn=tts.read_last_message, inputs=[chat], outputs=[audio_output] | |
| ).success( | |
| fn=lambda: ( | |
| gr.update(visible=False), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| gr.update(visible=False), | |
| ), | |
| outputs=[init_acc, start_btn, terms_checkbox, interview_type_select, hi_markdown], | |
| ).success( | |
| fn=lambda: (gr.update(visible=True)), | |
| outputs=[problem_acc], | |
| ).success( | |
| fn=llm.get_problem, | |
| inputs=[requirements, difficulty_select, topic_select, interview_type_select], | |
| outputs=[description], | |
| scroll_to_output=True, | |
| ).success( | |
| fn=llm.init_bot, inputs=[description, interview_type_select], outputs=[chat_history] | |
| ).success( | |
| fn=lambda: (gr.update(visible=True), gr.update(interactive=True), gr.update(interactive=True)), | |
| outputs=[solution_acc, end_btn, audio_input], | |
| ) | |
| end_btn.click(fn=lambda x: add_candidate_message("Let's stop here.", x), inputs=[chat], outputs=[chat]).success( | |
| fn=add_interviewer_message(fixed_messages["end"]), | |
| inputs=[chat], | |
| outputs=[chat], | |
| ).success(fn=tts.read_last_message, inputs=[chat], outputs=[audio_output]).success( | |
| fn=lambda: ( | |
| gr.update(open=False), | |
| gr.update(interactive=False), | |
| gr.update(open=False), | |
| gr.update(interactive=False), | |
| ), | |
| outputs=[solution_acc, end_btn, problem_acc, audio_input], | |
| ).success( | |
| fn=lambda: (gr.update(visible=True)), | |
| outputs=[feedback_acc], | |
| ).success( | |
| fn=llm.end_interview, inputs=[description, chat_history, interview_type_select], outputs=[feedback] | |
| ) | |
| audio_input.stream( | |
| stt.process_audio_chunk, | |
| inputs=[audio_input, audio_buffer], | |
| outputs=[audio_buffer, audio_to_transcribe], | |
| show_progress="hidden", | |
| ).success(fn=stt.transcribe_audio, inputs=[audio_to_transcribe, message], outputs=[message], show_progress="hidden") | |
| # TODO: find a way to remove delay | |
| audio_input.stop_recording(fn=lambda: time.sleep(2)).success( | |
| fn=add_candidate_message, inputs=[message, chat], outputs=[chat] | |
| ).success( | |
| fn=send_request_partial, | |
| inputs=[code, previous_code, chat_history, chat], | |
| outputs=[chat_history, chat, previous_code, audio_output], | |
| ).success( | |
| fn=lambda: np.array([], dtype=np.int16), outputs=[audio_buffer] | |
| ).success( | |
| lambda: "", outputs=[message] | |
| ) | |
| interview_type_select.change( | |
| fn=lambda x: gr.update(choices=topic_lists[x], value=np.random.choice(topic_lists[x])), | |
| inputs=[interview_type_select], | |
| outputs=[topic_select], | |
| ).success(fn=change_code_area, inputs=[interview_type_select], outputs=[code]) | |
| terms_checkbox.change(fn=lambda x: gr.update(interactive=x), inputs=[terms_checkbox], outputs=[start_btn]) | |
| return problem_tab | |