|
import io
|
|
import requests
|
|
import argparse
|
|
import asyncio
|
|
import numpy as np
|
|
import ffmpeg
|
|
from time import time
|
|
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
|
from src.whisper_streaming.whisper_online import backend_factory, online_factory, add_shared_args
|
|
|
|
|
|
import logging
|
|
import logging.config
|
|
from transformers import pipeline
|
|
|
|
MODEL_NAME = 'Helsinki-NLP/opus-tatoeba-en-ja'
|
|
TRANSLATOR = pipeline('translation', model=MODEL_NAME, device='cuda')
|
|
TRANSLATOR('Warming up!')
|
|
|
|
API_KEY = '3c2b8b0f-4fa9-4eb7-b67d-7cae25546051:fx'
|
|
|
|
SOURCE_LANG = 'EN'
|
|
TARGET_LANG = 'JA'
|
|
|
|
def translator_wrapper(source_text, mode='deepl'):
|
|
if mode == 'deepl':
|
|
params = {
|
|
'auth_key' : API_KEY,
|
|
'text' : source_text,
|
|
'source_lang' : SOURCE_LANG,
|
|
"target_lang": TARGET_LANG
|
|
}
|
|
|
|
|
|
try:
|
|
request = requests.post("https://api-free.deepl.com/v2/translate", data=params, timeout=5)
|
|
result = request.json()['translations'][0]['text']
|
|
except requests.exceptions.Timeout:
|
|
result = "(timed out)"
|
|
return result
|
|
|
|
elif mode == 'marianmt':
|
|
return TRANSLATOR(source_text)[0]['translation_text']
|
|
|
|
elif mode == 'google':
|
|
import requests
|
|
|
|
|
|
language_type = ""
|
|
target = 'ja-jp'
|
|
url = "https://translation.googleapis.com/language/translate/v2"
|
|
data = {
|
|
'key':"AIzaSyCX0-Wdxl_rgvcZzklNjnqJ1W9YiKjcHUs",
|
|
'source': language_type,
|
|
'target': target,
|
|
'q': source_text,
|
|
'format': "text"
|
|
}
|
|
|
|
|
|
response = requests.post(url, data)
|
|
|
|
print(response)
|
|
res = response.json()
|
|
print(res["data"]["translations"][0]["translatedText"])
|
|
result = res["data"]["translations"][0]["translatedText"]
|
|
print(result)
|
|
return result
|
|
|
|
|
|
def setup_logging():
|
|
logging_config = {
|
|
'version': 1,
|
|
'disable_existing_loggers': False,
|
|
'formatters': {
|
|
'standard': {
|
|
'format': '%(asctime)s %(levelname)s [%(name)s]: %(message)s',
|
|
},
|
|
},
|
|
'handlers': {
|
|
'console': {
|
|
'level': 'INFO',
|
|
'class': 'logging.StreamHandler',
|
|
'formatter': 'standard',
|
|
},
|
|
},
|
|
'root': {
|
|
'handlers': ['console'],
|
|
'level': 'DEBUG',
|
|
},
|
|
'loggers': {
|
|
'uvicorn': {
|
|
'handlers': ['console'],
|
|
'level': 'INFO',
|
|
'propagate': False,
|
|
},
|
|
'uvicorn.error': {
|
|
'level': 'INFO',
|
|
},
|
|
'uvicorn.access': {
|
|
'level': 'INFO',
|
|
},
|
|
'src.whisper_streaming.online_asr': {
|
|
'handlers': ['console'],
|
|
'level': 'DEBUG',
|
|
'propagate': False,
|
|
},
|
|
'src.whisper_streaming.whisper_streaming': {
|
|
'handlers': ['console'],
|
|
'level': 'DEBUG',
|
|
'propagate': False,
|
|
},
|
|
},
|
|
}
|
|
|
|
logging.config.dictConfig(logging_config)
|
|
|
|
setup_logging()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
parser = argparse.ArgumentParser(description="Whisper FastAPI Online Server")
|
|
parser.add_argument(
|
|
"--host",
|
|
type=str,
|
|
default="localhost",
|
|
help="The host address to bind the server to.",
|
|
)
|
|
parser.add_argument(
|
|
"--port", type=int, default=8000, help="The port number to bind the server to."
|
|
)
|
|
parser.add_argument(
|
|
"--warmup-file",
|
|
type=str,
|
|
dest="warmup_file",
|
|
help="The path to a speech audio wav file to warm up Whisper so that the very first chunk processing is fast. It can be e.g. https://github.com/ggerganov/whisper.cpp/raw/master/samples/jfk.wav .",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--diarization",
|
|
type=bool,
|
|
default=False,
|
|
help="Whether to enable speaker diarization.",
|
|
)
|
|
|
|
|
|
add_shared_args(parser)
|
|
args = parser.parse_args()
|
|
|
|
|
|
asr, tokenizer = backend_factory(args)
|
|
|
|
if args.diarization:
|
|
from src.diarization.diarization_online import DiartDiarization
|
|
|
|
|
|
|
|
with open("src/web/live_transcription.html", "r", encoding="utf-8") as f:
|
|
html = f.read()
|
|
|
|
|
|
@app.get("/")
|
|
async def get():
|
|
return HTMLResponse(html)
|
|
|
|
|
|
SAMPLE_RATE = 16000
|
|
CHANNELS = 1
|
|
SAMPLES_PER_SEC = SAMPLE_RATE * int(args.min_chunk_size)
|
|
BYTES_PER_SAMPLE = 2
|
|
BYTES_PER_SEC = SAMPLES_PER_SEC * BYTES_PER_SAMPLE
|
|
|
|
|
|
async def start_ffmpeg_decoder():
|
|
"""
|
|
Start an FFmpeg process in async streaming mode that reads WebM from stdin
|
|
and outputs raw s16le PCM on stdout. Returns the process object.
|
|
"""
|
|
process = (
|
|
ffmpeg.input("pipe:0", format="webm")
|
|
.output(
|
|
"pipe:1",
|
|
format="s16le",
|
|
acodec="pcm_s16le",
|
|
ac=CHANNELS,
|
|
ar=str(SAMPLE_RATE),
|
|
)
|
|
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
|
|
)
|
|
return process
|
|
|
|
|
|
|
|
@app.websocket("/asr")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
print("WebSocket connection opened.")
|
|
|
|
ffmpeg_process = await start_ffmpeg_decoder()
|
|
pcm_buffer = bytearray()
|
|
print("Loading online.")
|
|
online = online_factory(args, asr, tokenizer)
|
|
print("Online loaded.")
|
|
|
|
if args.diarization:
|
|
diarization = DiartDiarization(SAMPLE_RATE)
|
|
|
|
|
|
async def ffmpeg_stdout_reader():
|
|
nonlocal pcm_buffer
|
|
loop = asyncio.get_event_loop()
|
|
full_transcription = ""
|
|
beg = time()
|
|
|
|
chunk_history = []
|
|
|
|
buffers = [{'speaker': '0', 'text': '', 'translation': None}]
|
|
buffer_line = ''
|
|
|
|
while True:
|
|
print('in while')
|
|
try:
|
|
print('try in while')
|
|
elapsed_time = int(time() - beg)
|
|
beg = time()
|
|
print('before await loop.run_in_executor()')
|
|
chunk = await loop.run_in_executor(None, ffmpeg_process.stdout.read, 32000 * elapsed_time)
|
|
|
|
print('before if not chunk')
|
|
if not chunk:
|
|
chunk = await loop.run_in_executor(None, ffmpeg_process.stdout.read, 4096)
|
|
if not chunk:
|
|
print("FFmpeg stdout closed.")
|
|
break
|
|
|
|
pcm_buffer.extend(chunk)
|
|
|
|
print('before if len(pcm_buffer)')
|
|
if len(pcm_buffer) >= BYTES_PER_SEC:
|
|
print('in if len(pcm_buffer)')
|
|
|
|
pcm_array = (np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0)
|
|
pcm_buffer = bytearray()
|
|
online.insert_audio_chunk(pcm_array)
|
|
beg_trans, end_trans, trans = online.process_iter()
|
|
|
|
if trans:
|
|
chunk_history.append({
|
|
"beg": beg_trans,
|
|
"end": end_trans,
|
|
"text": trans,
|
|
"speaker": "0"
|
|
})
|
|
full_transcription += trans
|
|
|
|
|
|
|
|
|
|
if args.vac:
|
|
|
|
buffer = online.online.concatenate_tsw(online.online.transcript_buffer.buffer)[2]
|
|
else:
|
|
buffer = online.concatenate_tsw(online.transcript_buffer.buffer)[2]
|
|
|
|
if buffer in full_transcription:
|
|
buffer = ""
|
|
|
|
buffer_line += buffer
|
|
|
|
punctuations = (',', '.', '?', '!', 'and', 'or', 'but', 'however')
|
|
if any(punctuation in buffer_line for punctuation in punctuations):
|
|
last_punctuation_index = max((buffer_line.rfind(p) + len(p) + 1) for p in punctuations if p in buffer_line)
|
|
extracted_text = buffer_line[:last_punctuation_index]
|
|
buffer_line = buffer_line[last_punctuation_index:]
|
|
buffers.append({'speaker': '0', 'text': extracted_text, 'translation': None})
|
|
|
|
|
|
print('buffers for loop')
|
|
for i, buffer in enumerate(buffers):
|
|
print(i, buffer)
|
|
if buffer['translation'] is not None:
|
|
continue
|
|
if buffer['text'] == '':
|
|
continue
|
|
|
|
transcription = buffer['text']
|
|
buffers[i]['translation'] = translator_wrapper(transcription, mode='google')
|
|
buffers[i]['text'] += ('|' + buffers[i]['translation'])
|
|
|
|
|
|
|
|
|
|
print('Process lines')
|
|
lines = [{"speaker": "0", "text": ""}]
|
|
|
|
if args.diarization:
|
|
await diarization.diarize(pcm_array)
|
|
|
|
chunk_history = diarization.assign_speakers_to_chunks(chunk_history)
|
|
|
|
for ch in chunk_history:
|
|
if args.diarization and ch["speaker"] and ch["speaker"][-1] != lines[-1]["speaker"]:
|
|
lines.append({"speaker": ch["speaker"], "text": ch['text']})
|
|
|
|
else:
|
|
lines.append({"speaker": ch["speaker"], "text": ch['text']})
|
|
|
|
for i, line in enumerate(lines):
|
|
if line['text'].strip() == '':
|
|
continue
|
|
|
|
|
|
|
|
lines[i]['text'] = line['text']
|
|
|
|
|
|
|
|
|
|
|
|
print('Before making response')
|
|
response = {"lines": buffers, "buffer": ''}
|
|
await websocket.send_json(response)
|
|
|
|
except Exception as e:
|
|
print(f"Exception in ffmpeg_stdout_reader: {e}")
|
|
break
|
|
|
|
print("Exiting ffmpeg_stdout_reader...")
|
|
|
|
stdout_reader_task = asyncio.create_task(ffmpeg_stdout_reader())
|
|
|
|
try:
|
|
while True:
|
|
|
|
message = await websocket.receive_bytes()
|
|
|
|
ffmpeg_process.stdin.write(message)
|
|
ffmpeg_process.stdin.flush()
|
|
|
|
except WebSocketDisconnect:
|
|
print("WebSocket connection closed.")
|
|
except Exception as e:
|
|
print(f"Error in websocket loop: {e}")
|
|
finally:
|
|
|
|
try:
|
|
ffmpeg_process.stdin.close()
|
|
except:
|
|
pass
|
|
stdout_reader_task.cancel()
|
|
|
|
try:
|
|
ffmpeg_process.stdout.close()
|
|
except:
|
|
pass
|
|
|
|
ffmpeg_process.wait()
|
|
del online
|
|
|
|
if args.diarization:
|
|
|
|
diarization.close()
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"whisper_fastapi_online_server:app", host=args.host, port=args.port, reload=True,
|
|
log_level="info"
|
|
)
|
|
|