Spaces:
Build error
Build error
| try: | |
| from ..gui.signal import * | |
| from ..utils.db import * | |
| from ..utils.telemetry import my_tracer, os_name | |
| except ImportError: | |
| from gui.signal import * | |
| from utils.db import * | |
| from utils.telemetry import my_tracer, os_name | |
| import numpy as np | |
| import sounddevice as sd | |
| import soundfile as sf | |
| import scipy.io.wavfile as wavfile | |
| import soundcard as sc | |
| import threading | |
| import time | |
| from scipy.io.wavfile import write | |
| samplerate = 48000 # Updated samplerate for better quality | |
| channels = 1 | |
| recording = False | |
| audio_data = None | |
| user_id = load_user_id() | |
| os_name_ = os_name() | |
| the_input_box_pre = "" | |
| import queue | |
| # Initialize a queue to keep the last N audio levels (rolling window) | |
| audio_levels = queue.Queue(maxsize=10) # Adjust size as needed | |
| def calculate_dynamic_threshold(): | |
| """Calculate a dynamic threshold based on recent audio levels.""" | |
| if audio_levels.qsize() == 0: | |
| return 0.01 # Default threshold if no data is available | |
| else: | |
| # Calculate the average of the last N audio levels | |
| return np.mean(list(audio_levels.queue)) * 2 # Adjust multiplier as needed | |
| silence_start_time = None | |
| auto_stop_recording = True | |
| def start_recording(take_system_audio, buttonhandler): | |
| """Start recording audio from microphone and/or system sound.""" | |
| with my_tracer.start_span("start_recording") as span: | |
| span.set_attribute("user_id", user_id) | |
| span.set_attribute("os_name", os_name_) | |
| global the_input_box_pre | |
| from ..gpt_computer_agent import the_input_box, the_main_window | |
| the_input_box_pre = the_input_box.toPlainText() | |
| the_main_window.update_from_thread("Click again when recording is done") | |
| global recording, audio_data, silence_start_time, auto_stop_recording | |
| recording = True | |
| audio_data = np.array([], dtype="float32") | |
| print("Recording started...") | |
| threshold = 0.01 # Define the threshold for stopping the recording | |
| silence_duration = ( | |
| 2 # Duration in seconds to consider as silence before stopping | |
| ) | |
| silence_start_time = None | |
| recording_start_time = time.time() # Record the start time of the recording | |
| auto_stop_recording = is_auto_stop_recording_setting_active() | |
| def callback(indata, frames, time_info, status): | |
| global audio_data, recording, silence_start_time, auto_stop_recording | |
| current_level = np.max(np.abs(indata)) | |
| # Add the current level to the queue | |
| if audio_levels.full(): | |
| audio_levels.get() # Remove the oldest level if the queue is full | |
| audio_levels.put(current_level) | |
| # Calculate dynamic threshold based on recent audio levels | |
| dynamic_threshold = calculate_dynamic_threshold() | |
| if recording: | |
| audio_data = np.append(audio_data, indata) | |
| # Check if the audio is below the dynamic threshold | |
| if current_level < dynamic_threshold and auto_stop_recording: | |
| if silence_start_time is None: | |
| silence_start_time = time.time() # Mark the start of silence | |
| # Ensure recording has been ongoing for at least 3 seconds before considering auto-stop | |
| elif (time.time() - silence_start_time) > silence_duration and ( | |
| time.time() - recording_start_time | |
| ) > 3: | |
| recording = False | |
| buttonhandler.recording = False | |
| else: | |
| silence_start_time = None | |
| def record_audio(): | |
| with my_tracer.start_span("record_audio") as span: | |
| span.set_attribute("user_id", user_id) | |
| span.set_attribute("os_name", os_name_) | |
| global recording | |
| mics = sc.all_microphones(include_loopback=True) | |
| default_mic = mics[0] | |
| data = [] | |
| with default_mic.recorder(samplerate=148000) as mic: | |
| print("Recording...") | |
| while recording: | |
| frame = mic.record(numframes=4096) | |
| data.append(frame) | |
| data = np.concatenate(data, axis=0) | |
| data_int16 = (data * 32767).astype("int16") | |
| wavfile.write(system_sound_location, 148000, data_int16) | |
| if take_system_audio: | |
| recording_thread = threading.Thread(target=record_audio) | |
| recording_thread.start() | |
| with sd.InputStream(callback=callback, channels=channels, samplerate=samplerate): | |
| while recording: | |
| sd.sleep(100) | |
| if not recording: | |
| sf.write(mic_record_location, audio_data, samplerate) | |
| print("Audio saved as voice_input.wav") | |
| signal_handler.recording_stopped.emit() | |
| def stop_recording(): | |
| """Stop recording audio.""" | |
| global recording | |
| recording = False | |
| print("Recording stopped") | |
| def quick_speech_to_text(time_total: int = 5) -> str: | |
| global samplerate, channels, samplerate | |
| quic_location = "temp.wav" | |
| myrecording = sd.rec( | |
| int(time_total * samplerate), samplerate=samplerate, channels=channels | |
| ) | |
| sd.wait() # Wait until recording is finished | |
| write(quic_location, samplerate, myrecording) # Save as WAV file | |
| try: | |
| from .stt import speech_to_text | |
| except ImportError: | |
| from stt import speech_to_text | |
| return speech_to_text(quic_location) | |