translateEn2FR / app.py
amiguel's picture
Update app.py
55d3e9f verified
raw
history blame
16.5 kB
import streamlit as st
from transformers import PreTrainedModel, PretrainedConfig, AutoTokenizer
from huggingface_hub import login
import PyPDF2
import pandas as pd
import torch
import numpy as np
from copy import deepcopy
import math
import time
# Device setup
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# Set page configuration
st.set_page_config(
page_title="Translator Agent",
page_icon="🚀",
layout="centered"
)
# Model name
MODEL_NAME = "amiguel/en2fr-transformer"
# Translation prompt template
TRANSLATION_PROMPT = """
You are a professional translator specializing in English-to-French translation. Translate the following text accurately and naturally into French, preserving the original meaning and tone:
**Text to translate:**
{input_text}
**French translation:**
"""
# Title with rocket emojis
st.title("🚀 English to French Translator 🚀")
# Configure Avatars
USER_AVATAR = "https://raw.githubusercontent.com/achilela/vila_fofoka_analysis/9904d9a0d445ab0488cf7395cb863cce7621d897/USER_AVATAR.png"
BOT_AVATAR = "https://raw.githubusercontent.com/achilela/vila_fofoka_analysis/991f4c6e4e1dc7a8e24876ca5aae5228bcdb4dba/Ataliba_Avatar.jpg"
# Sidebar configuration
with st.sidebar:
st.header("Authentication 🔒")
hf_token = st.text_input("Hugging Face Token", type="password",
help="Get your token from https://huggingface.co/settings/tokens")
st.header("Upload Documents 📂")
uploaded_file = st.file_uploader(
"Choose a PDF or XLSX file to translate",
type=["pdf", "xlsx"],
label_visibility="collapsed"
)
# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []
# File processing function
@st.cache_data
def process_file(uploaded_file):
if uploaded_file is None:
return ""
try:
if uploaded_file.type == "application/pdf":
pdf_reader = PyPDF2.PdfReader(uploaded_file)
return "\n".join([page.extract_text() for page in pdf_reader.pages])
elif uploaded_file.type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet":
df = pd.read_excel(uploaded_file)
return df.to_markdown()
except Exception as e:
st.error(f"📄 Error processing file: {str(e)}")
return ""
# Custom model definition (copied from previous steps)
# Masking functions
def subsequent_mask(size):
attn_shape = (1, size, size)
subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
return torch.from_numpy(subsequent_mask) == 0
def make_std_mask(tgt, pad):
tgt_mask = (tgt != pad).unsqueeze(-2)
return tgt_mask & subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)
# Batch class
class Batch:
def __init__(self, src, trg=None, pad=0):
src = torch.from_numpy(src).to(DEVICE).long()
self.src = src
self.src_mask = (src != pad).unsqueeze(-2)
if trg is not None:
trg = torch.from_numpy(trg).to(DEVICE).long()
self.trg = trg[:, :-1]
self.trg_y = trg[:, 1:]
self.trg_mask = make_std_mask(self.trg, pad)
self.ntokens = (self.trg_y != pad).data.sum()
# Hugging Face config
class En2FrConfig(PretrainedConfig):
model_type = "en2fr_transformer"
def __init__(self, src_vocab=32000, tgt_vocab=32000, N=6, d_model=512,
d_ff=2048, h=8, dropout=0.1, **kwargs):
self.src_vocab = src_vocab
self.tgt_vocab = tgt_vocab
self.N = N
self.d_model = d_model
self.d_ff = d_ff
self.h = h
self.dropout = dropout
super().__init__(**kwargs)
# Transformer components
class Transformer(nn.Module):
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed
self.tgt_embed = tgt_embed
self.generator = generator
def forward(self, src, tgt, src_mask, tgt_mask):
memory = self.encoder(self.src_embed(src), src_mask)
output = self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
return output
class Encoder(nn.Module):
def __init__(self, layer, N):
super().__init__()
self.layers = nn.ModuleList([deepcopy(layer) for _ in range(N)])
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
class EncoderLayer(nn.Module):
def __init__(self, size, self_attn, feed_forward, dropout):
super().__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.sublayer = nn.ModuleList([deepcopy(SublayerConnection(size, dropout)) for _ in range(2)])
self.size = size
def forward(self, x, mask):
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
class Decoder(nn.Module):
def __init__(self, layer, N):
super().__init__()
self.layers = nn.ModuleList([deepcopy(layer) for _ in range(N)])
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, memory, src_mask, tgt_mask)
return self.norm(x)
class DecoderLayer(nn.Module):
def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
super().__init__()
self.size = size
self.self_attn = self_attn
self.src_attn = src_attn
self.feed_forward = feed_forward
self.sublayer = nn.ModuleList([deepcopy(SublayerConnection(size, dropout)) for _ in range(3)])
def forward(self, x, memory, src_mask, tgt_mask):
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
x = self.sublayer[1](x, lambda x: self.src_attn(x, memory, memory, src_mask))
return self.sublayer[2](x, self.feed_forward)
class SublayerConnection(nn.Module):
def __init__(self, size, dropout):
super().__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, sublayer):
return x + self.dropout(sublayer(self.norm(x)))
class LayerNorm(nn.Module):
def __init__(self, features, eps=1e-6):
super().__init__()
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True)
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
class MultiHeadedAttention(nn.Module):
def __init__(self, h, d_model, dropout=0.1):
super().__init__()
assert d_model % h == 0
self.d_k = d_model // h
self.h = h
self.linears = nn.ModuleList([deepcopy(nn.Linear(d_model, d_model)) for _ in range(4)])
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
if mask is not None:
mask = mask.unsqueeze(1)
nbatches = query.size(0)
query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for l, x in zip(self.linears, (query, key, value))]
x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
return self.linears[-1](x)
def attention(query, key, value, mask=None, dropout=None):
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = nn.functional.softmax(scores, dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super().__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
return self.w_2(self.dropout(self.w_1(x)))
class Embeddings(nn.Module):
def __init__(self, d_model, vocab):
super().__init__()
self.lut = nn.Embedding(vocab, d_model)
self.d_model = d_model
def forward(self, x):
return self.lut(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout, max_len=5000):
super().__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model, device=DEVICE)
position = torch.arange(0., max_len, device=DEVICE).unsqueeze(1)
div_term = torch.exp(torch.arange(0., d_model, 2, device=DEVICE) * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1)].requires_grad_(False)
return self.dropout(x)
class Generator(nn.Module):
def __init__(self, d_model, vocab):
super().__init__()
self.proj = nn.Linear(d_model, vocab)
def forward(self, x):
return nn.functional.log_softmax(self.proj(x), dim=-1)
def create_model(src_vocab, tgt_vocab, N, d_model, d_ff, h, dropout=0.1):
attn = MultiHeadedAttention(h, d_model).to(DEVICE)
ff = PositionwiseFeedForward(d_model, d_ff, dropout).to(DEVICE)
pos = PositionalEncoding(d_model, dropout).to(DEVICE)
model = Transformer(
Encoder(EncoderLayer(d_model, deepcopy(attn), deepcopy(ff), dropout).to(DEVICE), N).to(DEVICE),
Decoder(DecoderLayer(d_model, deepcopy(attn), deepcopy(attn), deepcopy(ff), dropout).to(DEVICE), N).to(DEVICE),
nn.Sequential(Embeddings(d_model, src_vocab).to(DEVICE), deepcopy(pos)),
nn.Sequential(Embeddings(d_model, tgt_vocab).to(DEVICE), deepcopy(pos)),
Generator(d_model, tgt_vocab)).to(DEVICE)
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
class En2FrTransformer(PreTrainedModel):
config_class = En2FrConfig
def __init__(self, config):
super().__init__(config)
self.model = create_model(
src_vocab=config.src_vocab,
tgt_vocab=config.tgt_vocab,
N=config.N,
d_model=config.d_model,
d_ff=config.d_ff,
h=config.h,
dropout=config.dropout
)
def forward(self, src, tgt, src_mask, tgt_mask):
return self.model(src, tgt, src_mask, tgt_mask)
# Model loading function
@st.cache_resource
def load_model(hf_token):
try:
if not hf_token:
st.error("🔐 Authentication required! Please provide a Hugging Face token.")
return None
login(token=hf_token)
# Load tokenizer (assuming a tokenizer was saved with the model)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, token=hf_token)
# Load the custom model
model = En2FrTransformer.from_pretrained(
MODEL_NAME,
token=hf_token
)
model.to(DEVICE) # Ensure model is on the correct device
return model, tokenizer
except Exception as e:
st.error(f"🤖 Model loading failed: {str(e)}")
return None
# Simple tokenization function (placeholder, since we don't have the actual vocab)
def tokenize_text(text, tokenizer, max_length=10):
# This is a placeholder; in a real scenario, you'd use the tokenizer's vocabulary
# For now, we'll create dummy token IDs (0 for padding, 1 for start, 2 for end, 3+ for words)
words = text.split()
token_ids = [1] + [i + 3 for i in range(min(len(words), max_length - 2))] + [2]
if len(token_ids) < max_length:
token_ids += [0] * (max_length - len(token_ids))
return torch.tensor([token_ids], dtype=torch.long, device=DEVICE)
# Generation function for translation (custom inference loop)
def generate_translation(input_text, model, tokenizer):
model.eval()
with torch.no_grad():
# Tokenize input (source) and target (start with a dummy start token)
src = tokenize_text(input_text, tokenizer)
tgt = torch.tensor([[1]], dtype=torch.long, device=DEVICE) # Start token
src_mask = (src != 0).unsqueeze(-2)
max_length = 10 # Adjust as needed
# Generate translation token by token
for _ in range(max_length - 1):
tgt_mask = make_std_mask(tgt, pad=0)
output = model(src, tgt, src_mask, tgt_mask)
output = model.model.generator(output[:, -1, :]) # Get logits for the last token
next_token = torch.argmax(output, dim=-1).unsqueeze(0)
tgt = torch.cat((tgt, next_token), dim=1)
if next_token.item() == 2: # End token
break
# Convert token IDs back to text (placeholder)
# In a real scenario, you'd use tokenizer.decode()
translation = " ".join([f"word{i-3}" if i >= 3 else "<start>" if i == 1 else "<end>" for i in tgt[0].tolist()])
return translation
# Display chat messages
for message in st.session_state.messages:
try:
avatar = USER_AVATAR if message["role"] == "user" else BOT_AVATAR
with st.chat_message(message["role"], avatar=avatar):
st.markdown(message["content"])
except:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input handling
if prompt := st.chat_input("Enter text to translate into French..."):
if not hf_token:
st.error("🔑 Authentication required!")
st.stop()
# Load model if not already loaded
if "model" not in st.session_state:
model_data = load_model(hf_token)
if model_data is None:
st.error("Failed to load model. Please check your token and try again.")
st.stop()
st.session_state.model, st.session_state.tokenizer = model_data
model = st.session_state.model
tokenizer = st.session_state.tokenizer
# Add user message
with st.chat_message("user", avatar=USER_AVATAR):
st.markdown(prompt)
st.session_state.messages.append({"role": "user", "content": prompt})
# Process file or use prompt directly
file_context = process_file(uploaded_file)
input_text = file_context if file_context else prompt
# Generate translation
if model and tokenizer:
try:
with st.chat_message("assistant", avatar=BOT_AVATAR):
start_time = time.time()
translation = generate_translation(input_text, model, tokenizer)
# Display the translation
st.markdown(translation)
st.session_state.messages.append({"role": "assistant", "content": translation})
# Calculate performance metrics (simplified, since we don't have real token counts)
end_time = time.time()
input_tokens = len(input_text.split()) # Approximate
output_tokens = len(translation.split()) # Approximate
speed = output_tokens / (end_time - start_time)
# Calculate costs (hypothetical pricing model)
input_cost = (input_tokens / 1000000) * 5 # $5 per million input tokens
output_cost = (output_tokens / 1000000) * 15 # $15 per million output tokens
total_cost_usd = input_cost + output_cost
total_cost_aoa = total_cost_usd * 1160 # Convert to AOA (Angolan Kwanza)
# Display metrics
st.caption(
f"🔑 Input Tokens: {input_tokens} | Output Tokens: {output_tokens} | "
f"🕒 Speed: {speed:.1f}t/s | 💰 Cost (USD): ${total_cost_usd:.4f} | "
f"💵 Cost (AOA): {total_cost_aoa:.4f}"
)
except Exception as e:
st.error(f"⚡ Translation error: {str(e)}")
else:
st.error("🤖 Model not loaded!")