|
|
|
import pyaudio |
|
import numpy as np |
|
import wave |
|
import os |
|
from datetime import datetime |
|
import config |
|
|
|
class AudioRecorder: |
|
def __init__(self): |
|
self.audio = pyaudio.PyAudio() |
|
self.stream = None |
|
self.is_recording = False |
|
self.sample_rate = config.SAMPLE_RATE |
|
self.filename = f"meeting_{datetime.now().strftime('%Y%m%d_%H%M%S')}.wav" |
|
self.audio_buffer = [] |
|
|
|
def start_recording(self): |
|
self.is_recording = True |
|
self.audio_buffer = [] |
|
self.stream = self.audio.open( |
|
format=config.AUDIO_FORMAT, |
|
channels=config.CHANNELS, |
|
rate=self.sample_rate, |
|
input=True, |
|
frames_per_buffer=config.CHUNK_SIZE, |
|
input_device_index=config.INPUT_DEVICE_INDEX |
|
) |
|
return True |
|
|
|
def get_audio_chunk(self): |
|
if self.is_recording: |
|
try: |
|
data = self.stream.read(config.CHUNK_SIZE, exception_on_overflow=False) |
|
|
|
return np.frombuffer(data, dtype=np.int16) |
|
except OSError as e: |
|
print(f"Audio error: {str(e)}") |
|
return None |
|
return None |
|
|
|
def stop_recording(self): |
|
if self.is_recording: |
|
self.is_recording = False |
|
self.stream.stop_stream() |
|
self.stream.close() |
|
self._save_recording() |
|
return os.path.join(config.RECORDINGS_DIR, self.filename) |
|
return None |
|
|
|
def _save_recording(self): |
|
os.makedirs(config.RECORDINGS_DIR, exist_ok=True) |
|
with wave.open(os.path.join(config.RECORDINGS_DIR, self.filename), 'wb') as wf: |
|
wf.setnchannels(config.CHANNELS) |
|
wf.setsampwidth(self.audio.get_sample_size(config.AUDIO_FORMAT)) |
|
wf.setframerate(self.sample_rate) |
|
wf.writeframes(b''.join(self.audio_buffer)) |