Spaces:
Sleeping
Sleeping
import streamlit as st | |
from transformers import PreTrainedModel, PretrainedConfig, AutoTokenizer | |
from huggingface_hub import login | |
import PyPDF2 | |
import pandas as pd | |
import torch | |
import numpy as np | |
from copy import deepcopy | |
import math | |
import time | |
# Device setup | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
# Set page configuration | |
st.set_page_config( | |
page_title="Translator Agent", | |
page_icon="🚀", | |
layout="centered" | |
) | |
# Model name | |
MODEL_NAME = "amiguel/en2fr-transformer" | |
# Translation prompt template | |
TRANSLATION_PROMPT = """ | |
You are a professional translator specializing in English-to-French translation. Translate the following text accurately and naturally into French, preserving the original meaning and tone: | |
**Text to translate:** | |
{input_text} | |
**French translation:** | |
""" | |
# Title with rocket emojis | |
st.title("🚀 English to French Translator 🚀") | |
# Configure Avatars | |
USER_AVATAR = "https://raw.githubusercontent.com/achilela/vila_fofoka_analysis/9904d9a0d445ab0488cf7395cb863cce7621d897/USER_AVATAR.png" | |
BOT_AVATAR = "https://raw.githubusercontent.com/achilela/vila_fofoka_analysis/991f4c6e4e1dc7a8e24876ca5aae5228bcdb4dba/Ataliba_Avatar.jpg" | |
# Sidebar configuration | |
with st.sidebar: | |
st.header("Authentication 🔒") | |
hf_token = st.text_input("Hugging Face Token", type="password", | |
help="Get your token from https://huggingface.co/settings/tokens") | |
st.header("Upload Documents 📂") | |
uploaded_file = st.file_uploader( | |
"Choose a PDF or XLSX file to translate", | |
type=["pdf", "xlsx"], | |
label_visibility="collapsed" | |
) | |
# Initialize chat history | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# File processing function | |
def process_file(uploaded_file): | |
if uploaded_file is None: | |
return "" | |
try: | |
if uploaded_file.type == "application/pdf": | |
pdf_reader = PyPDF2.PdfReader(uploaded_file) | |
return "\n".join([page.extract_text() for page in pdf_reader.pages]) | |
elif uploaded_file.type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": | |
df = pd.read_excel(uploaded_file) | |
return df.to_markdown() | |
except Exception as e: | |
st.error(f"📄 Error processing file: {str(e)}") | |
return "" | |
# Custom model definition (copied from previous steps) | |
# Masking functions | |
def subsequent_mask(size): | |
attn_shape = (1, size, size) | |
subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8') | |
return torch.from_numpy(subsequent_mask) == 0 | |
def make_std_mask(tgt, pad): | |
tgt_mask = (tgt != pad).unsqueeze(-2) | |
return tgt_mask & subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data) | |
# Batch class | |
class Batch: | |
def __init__(self, src, trg=None, pad=0): | |
src = torch.from_numpy(src).to(DEVICE).long() | |
self.src = src | |
self.src_mask = (src != pad).unsqueeze(-2) | |
if trg is not None: | |
trg = torch.from_numpy(trg).to(DEVICE).long() | |
self.trg = trg[:, :-1] | |
self.trg_y = trg[:, 1:] | |
self.trg_mask = make_std_mask(self.trg, pad) | |
self.ntokens = (self.trg_y != pad).data.sum() | |
# Hugging Face config | |
class En2FrConfig(PretrainedConfig): | |
model_type = "en2fr_transformer" | |
def __init__(self, src_vocab=32000, tgt_vocab=32000, N=6, d_model=512, | |
d_ff=2048, h=8, dropout=0.1, **kwargs): | |
self.src_vocab = src_vocab | |
self.tgt_vocab = tgt_vocab | |
self.N = N | |
self.d_model = d_model | |
self.d_ff = d_ff | |
self.h = h | |
self.dropout = dropout | |
super().__init__(**kwargs) | |
# Transformer components | |
class Transformer(nn.Module): | |
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator): | |
super().__init__() | |
self.encoder = encoder | |
self.decoder = decoder | |
self.src_embed = src_embed | |
self.tgt_embed = tgt_embed | |
self.generator = generator | |
def forward(self, src, tgt, src_mask, tgt_mask): | |
memory = self.encoder(self.src_embed(src), src_mask) | |
output = self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask) | |
return output | |
class Encoder(nn.Module): | |
def __init__(self, layer, N): | |
super().__init__() | |
self.layers = nn.ModuleList([deepcopy(layer) for _ in range(N)]) | |
self.norm = LayerNorm(layer.size) | |
def forward(self, x, mask): | |
for layer in self.layers: | |
x = layer(x, mask) | |
return self.norm(x) | |
class EncoderLayer(nn.Module): | |
def __init__(self, size, self_attn, feed_forward, dropout): | |
super().__init__() | |
self.self_attn = self_attn | |
self.feed_forward = feed_forward | |
self.sublayer = nn.ModuleList([deepcopy(SublayerConnection(size, dropout)) for _ in range(2)]) | |
self.size = size | |
def forward(self, x, mask): | |
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask)) | |
return self.sublayer[1](x, self.feed_forward) | |
class Decoder(nn.Module): | |
def __init__(self, layer, N): | |
super().__init__() | |
self.layers = nn.ModuleList([deepcopy(layer) for _ in range(N)]) | |
self.norm = LayerNorm(layer.size) | |
def forward(self, x, memory, src_mask, tgt_mask): | |
for layer in self.layers: | |
x = layer(x, memory, src_mask, tgt_mask) | |
return self.norm(x) | |
class DecoderLayer(nn.Module): | |
def __init__(self, size, self_attn, src_attn, feed_forward, dropout): | |
super().__init__() | |
self.size = size | |
self.self_attn = self_attn | |
self.src_attn = src_attn | |
self.feed_forward = feed_forward | |
self.sublayer = nn.ModuleList([deepcopy(SublayerConnection(size, dropout)) for _ in range(3)]) | |
def forward(self, x, memory, src_mask, tgt_mask): | |
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask)) | |
x = self.sublayer[1](x, lambda x: self.src_attn(x, memory, memory, src_mask)) | |
return self.sublayer[2](x, self.feed_forward) | |
class SublayerConnection(nn.Module): | |
def __init__(self, size, dropout): | |
super().__init__() | |
self.norm = LayerNorm(size) | |
self.dropout = nn.Dropout(dropout) | |
def forward(self, x, sublayer): | |
return x + self.dropout(sublayer(self.norm(x))) | |
class LayerNorm(nn.Module): | |
def __init__(self, features, eps=1e-6): | |
super().__init__() | |
self.a_2 = nn.Parameter(torch.ones(features)) | |
self.b_2 = nn.Parameter(torch.zeros(features)) | |
self.eps = eps | |
def forward(self, x): | |
mean = x.mean(-1, keepdim=True) | |
std = x.std(-1, keepdim=True) | |
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2 | |
class MultiHeadedAttention(nn.Module): | |
def __init__(self, h, d_model, dropout=0.1): | |
super().__init__() | |
assert d_model % h == 0 | |
self.d_k = d_model // h | |
self.h = h | |
self.linears = nn.ModuleList([deepcopy(nn.Linear(d_model, d_model)) for _ in range(4)]) | |
self.attn = None | |
self.dropout = nn.Dropout(p=dropout) | |
def forward(self, query, key, value, mask=None): | |
if mask is not None: | |
mask = mask.unsqueeze(1) | |
nbatches = query.size(0) | |
query, key, value = [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2) | |
for l, x in zip(self.linears, (query, key, value))] | |
x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout) | |
x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k) | |
return self.linears[-1](x) | |
def attention(query, key, value, mask=None, dropout=None): | |
d_k = query.size(-1) | |
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) | |
if mask is not None: | |
scores = scores.masked_fill(mask == 0, -1e9) | |
p_attn = nn.functional.softmax(scores, dim=-1) | |
if dropout is not None: | |
p_attn = dropout(p_attn) | |
return torch.matmul(p_attn, value), p_attn | |
class PositionwiseFeedForward(nn.Module): | |
def __init__(self, d_model, d_ff, dropout=0.1): | |
super().__init__() | |
self.w_1 = nn.Linear(d_model, d_ff) | |
self.w_2 = nn.Linear(d_ff, d_model) | |
self.dropout = nn.Dropout(dropout) | |
def forward(self, x): | |
return self.w_2(self.dropout(self.w_1(x))) | |
class Embeddings(nn.Module): | |
def __init__(self, d_model, vocab): | |
super().__init__() | |
self.lut = nn.Embedding(vocab, d_model) | |
self.d_model = d_model | |
def forward(self, x): | |
return self.lut(x) * math.sqrt(self.d_model) | |
class PositionalEncoding(nn.Module): | |
def __init__(self, d_model, dropout, max_len=5000): | |
super().__init__() | |
self.dropout = nn.Dropout(p=dropout) | |
pe = torch.zeros(max_len, d_model, device=DEVICE) | |
position = torch.arange(0., max_len, device=DEVICE).unsqueeze(1) | |
div_term = torch.exp(torch.arange(0., d_model, 2, device=DEVICE) * -(math.log(10000.0) / d_model)) | |
pe[:, 0::2] = torch.sin(position * div_term) | |
pe[:, 1::2] = torch.cos(position * div_term) | |
pe = pe.unsqueeze(0) | |
self.register_buffer('pe', pe) | |
def forward(self, x): | |
x = x + self.pe[:, :x.size(1)].requires_grad_(False) | |
return self.dropout(x) | |
class Generator(nn.Module): | |
def __init__(self, d_model, vocab): | |
super().__init__() | |
self.proj = nn.Linear(d_model, vocab) | |
def forward(self, x): | |
return nn.functional.log_softmax(self.proj(x), dim=-1) | |
def create_model(src_vocab, tgt_vocab, N, d_model, d_ff, h, dropout=0.1): | |
attn = MultiHeadedAttention(h, d_model).to(DEVICE) | |
ff = PositionwiseFeedForward(d_model, d_ff, dropout).to(DEVICE) | |
pos = PositionalEncoding(d_model, dropout).to(DEVICE) | |
model = Transformer( | |
Encoder(EncoderLayer(d_model, deepcopy(attn), deepcopy(ff), dropout).to(DEVICE), N).to(DEVICE), | |
Decoder(DecoderLayer(d_model, deepcopy(attn), deepcopy(attn), deepcopy(ff), dropout).to(DEVICE), N).to(DEVICE), | |
nn.Sequential(Embeddings(d_model, src_vocab).to(DEVICE), deepcopy(pos)), | |
nn.Sequential(Embeddings(d_model, tgt_vocab).to(DEVICE), deepcopy(pos)), | |
Generator(d_model, tgt_vocab)).to(DEVICE) | |
for p in model.parameters(): | |
if p.dim() > 1: | |
nn.init.xavier_uniform_(p) | |
return model | |
class En2FrTransformer(PreTrainedModel): | |
config_class = En2FrConfig | |
def __init__(self, config): | |
super().__init__(config) | |
self.model = create_model( | |
src_vocab=config.src_vocab, | |
tgt_vocab=config.tgt_vocab, | |
N=config.N, | |
d_model=config.d_model, | |
d_ff=config.d_ff, | |
h=config.h, | |
dropout=config.dropout | |
) | |
def forward(self, src, tgt, src_mask, tgt_mask): | |
return self.model(src, tgt, src_mask, tgt_mask) | |
# Model loading function | |
def load_model(hf_token): | |
try: | |
if not hf_token: | |
st.error("🔐 Authentication required! Please provide a Hugging Face token.") | |
return None | |
login(token=hf_token) | |
# Load tokenizer (assuming a tokenizer was saved with the model) | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, token=hf_token) | |
# Load the custom model | |
model = En2FrTransformer.from_pretrained( | |
MODEL_NAME, | |
token=hf_token | |
) | |
model.to(DEVICE) # Ensure model is on the correct device | |
return model, tokenizer | |
except Exception as e: | |
st.error(f"🤖 Model loading failed: {str(e)}") | |
return None | |
# Simple tokenization function (placeholder, since we don't have the actual vocab) | |
def tokenize_text(text, tokenizer, max_length=10): | |
# This is a placeholder; in a real scenario, you'd use the tokenizer's vocabulary | |
# For now, we'll create dummy token IDs (0 for padding, 1 for start, 2 for end, 3+ for words) | |
words = text.split() | |
token_ids = [1] + [i + 3 for i in range(min(len(words), max_length - 2))] + [2] | |
if len(token_ids) < max_length: | |
token_ids += [0] * (max_length - len(token_ids)) | |
return torch.tensor([token_ids], dtype=torch.long, device=DEVICE) | |
# Generation function for translation (custom inference loop) | |
def generate_translation(input_text, model, tokenizer): | |
model.eval() | |
with torch.no_grad(): | |
# Tokenize input (source) and target (start with a dummy start token) | |
src = tokenize_text(input_text, tokenizer) | |
tgt = torch.tensor([[1]], dtype=torch.long, device=DEVICE) # Start token | |
src_mask = (src != 0).unsqueeze(-2) | |
max_length = 10 # Adjust as needed | |
# Generate translation token by token | |
for _ in range(max_length - 1): | |
tgt_mask = make_std_mask(tgt, pad=0) | |
output = model(src, tgt, src_mask, tgt_mask) | |
output = model.model.generator(output[:, -1, :]) # Get logits for the last token | |
next_token = torch.argmax(output, dim=-1).unsqueeze(0) | |
tgt = torch.cat((tgt, next_token), dim=1) | |
if next_token.item() == 2: # End token | |
break | |
# Convert token IDs back to text (placeholder) | |
# In a real scenario, you'd use tokenizer.decode() | |
translation = " ".join([f"word{i-3}" if i >= 3 else "<start>" if i == 1 else "<end>" for i in tgt[0].tolist()]) | |
return translation | |
# Display chat messages | |
for message in st.session_state.messages: | |
try: | |
avatar = USER_AVATAR if message["role"] == "user" else BOT_AVATAR | |
with st.chat_message(message["role"], avatar=avatar): | |
st.markdown(message["content"]) | |
except: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Chat input handling | |
if prompt := st.chat_input("Enter text to translate into French..."): | |
if not hf_token: | |
st.error("🔑 Authentication required!") | |
st.stop() | |
# Load model if not already loaded | |
if "model" not in st.session_state: | |
model_data = load_model(hf_token) | |
if model_data is None: | |
st.error("Failed to load model. Please check your token and try again.") | |
st.stop() | |
st.session_state.model, st.session_state.tokenizer = model_data | |
model = st.session_state.model | |
tokenizer = st.session_state.tokenizer | |
# Add user message | |
with st.chat_message("user", avatar=USER_AVATAR): | |
st.markdown(prompt) | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Process file or use prompt directly | |
file_context = process_file(uploaded_file) | |
input_text = file_context if file_context else prompt | |
# Generate translation | |
if model and tokenizer: | |
try: | |
with st.chat_message("assistant", avatar=BOT_AVATAR): | |
start_time = time.time() | |
translation = generate_translation(input_text, model, tokenizer) | |
# Display the translation | |
st.markdown(translation) | |
st.session_state.messages.append({"role": "assistant", "content": translation}) | |
# Calculate performance metrics (simplified, since we don't have real token counts) | |
end_time = time.time() | |
input_tokens = len(input_text.split()) # Approximate | |
output_tokens = len(translation.split()) # Approximate | |
speed = output_tokens / (end_time - start_time) | |
# Calculate costs (hypothetical pricing model) | |
input_cost = (input_tokens / 1000000) * 5 # $5 per million input tokens | |
output_cost = (output_tokens / 1000000) * 15 # $15 per million output tokens | |
total_cost_usd = input_cost + output_cost | |
total_cost_aoa = total_cost_usd * 1160 # Convert to AOA (Angolan Kwanza) | |
# Display metrics | |
st.caption( | |
f"🔑 Input Tokens: {input_tokens} | Output Tokens: {output_tokens} | " | |
f"🕒 Speed: {speed:.1f}t/s | 💰 Cost (USD): ${total_cost_usd:.4f} | " | |
f"💵 Cost (AOA): {total_cost_aoa:.4f}" | |
) | |
except Exception as e: | |
st.error(f"⚡ Translation error: {str(e)}") | |
else: | |
st.error("🤖 Model not loaded!") |