import pandas as pd import streamlit as st import numpy as np import threading import torch import numpy as np from transformers import AutoTokenizer, AutoModelWithLMHead from huggingface_hub import HfApi, hf_hub_download from torch.utils.data import Dataset, DataLoader st.set_page_config( page_title="Koya Recommendation System", initial_sidebar_state="auto", ) st.markdown( """ # Koya recommeder System ### 👋 Welcome to the to the Koya recommendation system. This system recommeds an LLM for you when you set some given parameters. You can try it below""" ) @st.cache def get_model_infos(multilingual="multilingual"): api = HfApi() model_infos = api.list_models(filter=["fill-mask", multilingual], cardData=True) data = [['id','task', 'lang', 'sha']] count =0 for model in model_infos: try: data.append([model.modelId, model.pipeline_tag, model.cardData['language'], model.sha]) except: data.append([model.modelId, model.pipeline_tag, None, model.sha]) df = pd.DataFrame.from_records(data[1:], columns=data[0]) return df class MLMDataset(Dataset): def __init__(self,sentence,tokenizer,MLM_MASK_TOKEN,MLM_UNK_TOKEN): self.sentence = sentence self.tokenizer = tokenizer self.tensor_input = self.tokenizer(sentence, return_tensors='pt')['input_ids'] self.num_samples = self.tensor_input.size()[-1] - 2 self.batch_input = self.tensor_input.repeat(self.num_samples, 1) self.random_ids = np.random.choice([i for i in range(1,self.tensor_input.size(1)-1)],self.num_samples,replace=False) # ensuring that the masking is not done on the BOS and EOS tokens since they are not connected to the sentence itself. self.random_ids = torch.Tensor(self.random_ids).long().unsqueeze(0).T # Added by Chris Emezue on 29.01.2023 # Add a term called unk_mask, such that p(w|...) is 0 if w is unk and p(w|...) otherwise unk_mask = torch.ones(self.batch_input.size()[0],self.batch_input.size()[1],self.tokenizer.vocab_size) batch_input_for_unk = self.batch_input.unsqueeze(-1).expand(unk_mask.size()) self.unk_mask = unk_mask.masked_fill(batch_input_for_unk == MLM_UNK_TOKEN, 0) self.mask = torch.zeros(self.batch_input.size()) src = torch.ones(self.batch_input.size(0)).unsqueeze(0).T self.mask.scatter_(1, self.random_ids, src) self.masked_input = self.batch_input.masked_fill(self.mask == 1, MLM_MASK_TOKEN) self.labels = self.batch_input.masked_fill(self.masked_input != MLM_MASK_TOKEN, -100) # If logits change when labels is not set to -100: # If we are using the logits, this does not change it then. but if are using the loss, # then this has an effect. assert self.masked_input.shape[0]==self.labels.shape[0] == self.mask.shape[0] == self.unk_mask.shape[0] def __len__(self): return self.masked_input.shape[0] def __getitem__(self,idx): return self.masked_input[idx], self.mask[idx],self.labels[idx], self.unk_mask[idx] def get_sense_score_batched(sentence,tokenizer,model,MLM_MASK_TOKEN,MLM_UNK_TOKEN,BATCH_SIZE): mlm_dataset = MLMDataset(sentence,tokenizer,MLM_MASK_TOKEN,MLM_UNK_TOKEN) dataloader = DataLoader(mlm_dataset,batch_size=BATCH_SIZE) score =1 for i,batch in enumerate(dataloader): masked_input, mask,labels, unk_mask = batch output = model(masked_input, labels=labels) logits_ = output['logits'] logits = logits_ * unk_mask # Penalizing the unk tokens by setting their probs to zero indices = torch.nonzero(mask) logits_of_interest = logits[indices[:,0],indices[:,1],:] labels_of_interest = labels[indices[:,0],indices[:,1]] log_probs = logits_of_interest.gather(1,labels_of_interest.view(-1,1)) batch_score = (log_probs.sum()/(-1 *mlm_dataset.num_samples)).exp().item() # exp(x+y) = exp(x)*exp(y) score *= batch_score return score def get_sense_score(sentence,tokenizer,model,MLM_MASK_TOKEN,MLM_UNK_TOKEN,num_samples): ''' IDEA ----------------- PP = perplexity(P) where perplexity(P) function just computes: (p_1*p_*p_3*...*p_N)^(-1/N) for p_i in P In practice you need to do the computation in log space to avoid underflow: e^-((log(p_1) + log(p_2) + ... + log(p_N)) / N) Note: everytime you run this function, the results change slightly (but the ordering should be relatively the same), because the tokens to mask are chosen randomly. ''' tensor_input = tokenizer(sentence, return_tensors='pt')['input_ids'] batch_input = tensor_input.repeat(num_samples, 1) random_ids = np.random.choice([i for i in range(1,tensor_input.size(1)-1)],num_samples,replace=False) # ensuring that the masking is not done on the BOS and EOS tokens since they are not connected to the sentence itself. random_ids = torch.Tensor(random_ids).long().unsqueeze(0).T # Added by Chris Emezue on 29.01.2023 # Add a term called unk_mask, such that p(w|...) is 0 if w is unk and p(w|...) otherwise unk_mask = torch.ones(batch_input.size()[0],batch_input.size()[1],tokenizer.vocab_size) batch_input_for_unk = batch_input.unsqueeze(-1).expand(unk_mask.size()) unk_mask = unk_mask.masked_fill(batch_input_for_unk == MLM_UNK_TOKEN, 0) mask = torch.zeros(batch_input.size()) src = torch.ones(batch_input.size(0)).unsqueeze(0).T mask.scatter_(1, random_ids, src) masked_input = batch_input.masked_fill(mask == 1, MLM_MASK_TOKEN) labels = batch_input.masked_fill( masked_input != MLM_MASK_TOKEN, -100) # If logits change when labels is not set to -100: # If we are using the logits, this does not change it then. but if are using the loss, # then this has an effect. output = model(masked_input, labels=labels) logits_ = output['logits'] logits = logits_ * unk_mask # Penalizing the unk tokens by setting their probs to zero indices = torch.nonzero(mask) logits_of_interest = logits[indices[:,0],indices[:,1],:] labels_of_interest = labels[indices[:,0],indices[:,1]] log_probs = logits_of_interest.gather(1,labels_of_interest.view(-1,1)) score = (log_probs.sum()/(-1 *num_samples)).exp().item() return score def sort_dictionary(dict): keys = list(dict.keys()) values = list(dict.values()) sorted_value_index = np.argsort(values) sorted_dict = {keys[i]: values[i] for i in sorted_value_index} return sorted_dict def set_seed(): np.random.seed(2023) torch.manual_seed(2023) sentence = st.text_input("Please input a sample sentence in the target language") models = get_model_infos(multilingual=None) selected_models = st.multiselect("Select of number of models you would like to compare", models['id'] ) run = st.button("Get Scores") if run: progress_text = "Computing recommendation Scores" st.write(progress_text) my_bar = st.progress(0) scores={} for index, model_id in enumerate(selected_models): tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelWithLMHead.from_pretrained(model_id) if model_id.startswith("castorini"): tokenizer.model_max_length = 512 MLM_MASK_TOKEN = tokenizer.mask_token_id #[(103, '[MASK]')] MLM_UNK_TOKEN = tokenizer.unk_token_id BATCH_SIZE = 1 score = get_sense_score_batched(sentence,tokenizer,model,MLM_MASK_TOKEN,MLM_UNK_TOKEN,BATCH_SIZE) scores[model_id] = score my_bar.progress(index + 1) scores = sort_dictionary(scores) st.write("Our recommendation is:", scores)