Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import streamlit as st | |
| import numpy as np | |
| import threading | |
| import torch | |
| import numpy as np | |
| from styling import footer | |
| from transformers import AutoTokenizer, AutoModelWithLMHead | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from torch.utils.data import Dataset, DataLoader | |
| st.set_page_config( | |
| page_title="Koya Recommendation System", initial_sidebar_state="auto", | |
| ) | |
| st.markdown( | |
| """ | |
| # Koya Recommeder System | |
| ### π Welcome to the to the Koya recommendation system. This system recommeds an LLM for you when you provide a sample sentence in your target language and select a list of models. | |
| You can try it below \n\n\n""" | |
| ) | |
| def get_model_infos(multilingual="multilingual"): | |
| api = HfApi() | |
| model_infos = api.list_models(filter=["fill-mask", multilingual], cardData=True) | |
| data = [["id", "task", "lang", "sha"]] | |
| count = 0 | |
| for model in model_infos: | |
| try: | |
| data.append( | |
| [ | |
| model.modelId, | |
| model.pipeline_tag, | |
| model.cardData["language"], | |
| model.sha, | |
| ] | |
| ) | |
| except: | |
| data.append([model.modelId, model.pipeline_tag, None, model.sha]) | |
| df = pd.DataFrame.from_records(data[1:], columns=data[0]) | |
| return df | |
| class MLMDataset(Dataset): | |
| def __init__(self, sentence, tokenizer, MLM_MASK_TOKEN, MLM_UNK_TOKEN): | |
| self.sentence = sentence | |
| self.tokenizer = tokenizer | |
| self.tensor_input = self.tokenizer(sentence, return_tensors="pt")["input_ids"] | |
| self.num_samples = self.tensor_input.size()[-1] - 2 | |
| self.batch_input = self.tensor_input.repeat(self.num_samples, 1) | |
| self.random_ids = np.random.choice( | |
| [i for i in range(1, self.tensor_input.size(1) - 1)], | |
| self.num_samples, | |
| replace=False, | |
| ) # ensuring that the masking is not done on the BOS and EOS tokens since they are not connected to the sentence itself. | |
| self.random_ids = torch.Tensor(self.random_ids).long().unsqueeze(0).T | |
| # Added by Chris Emezue on 29.01.2023 | |
| # Add a term called unk_mask, such that p(w|...) is 0 if w is unk and p(w|...) otherwise | |
| unk_mask = torch.ones( | |
| self.batch_input.size()[0], | |
| self.batch_input.size()[1], | |
| self.tokenizer.vocab_size, | |
| ) | |
| batch_input_for_unk = self.batch_input.unsqueeze(-1).expand(unk_mask.size()) | |
| self.unk_mask = unk_mask.masked_fill(batch_input_for_unk == MLM_UNK_TOKEN, 0) | |
| self.mask = torch.zeros(self.batch_input.size()) | |
| src = torch.ones(self.batch_input.size(0)).unsqueeze(0).T | |
| self.mask.scatter_(1, self.random_ids, src) | |
| self.masked_input = self.batch_input.masked_fill(self.mask == 1, MLM_MASK_TOKEN) | |
| self.labels = self.batch_input.masked_fill( | |
| self.masked_input != MLM_MASK_TOKEN, -100 | |
| ) | |
| # If logits change when labels is not set to -100: | |
| # If we are using the logits, this does not change it then. but if are using the loss, | |
| # then this has an effect. | |
| assert ( | |
| self.masked_input.shape[0] | |
| == self.labels.shape[0] | |
| == self.mask.shape[0] | |
| == self.unk_mask.shape[0] | |
| ) | |
| def __len__(self): | |
| return self.masked_input.shape[0] | |
| def __getitem__(self, idx): | |
| return ( | |
| self.masked_input[idx], | |
| self.mask[idx], | |
| self.labels[idx], | |
| self.unk_mask[idx], | |
| ) | |
| def get_sense_score_batched( | |
| sentence, tokenizer, model, MLM_MASK_TOKEN, MLM_UNK_TOKEN, BATCH_SIZE | |
| ): | |
| mlm_dataset = MLMDataset(sentence, tokenizer, MLM_MASK_TOKEN, MLM_UNK_TOKEN) | |
| dataloader = DataLoader(mlm_dataset, batch_size=BATCH_SIZE) | |
| score = 1 | |
| for i, batch in enumerate(dataloader): | |
| masked_input, mask, labels, unk_mask = batch | |
| output = model(masked_input, labels=labels) | |
| logits_ = output["logits"] | |
| logits = ( | |
| logits_ * unk_mask | |
| ) # Penalizing the unk tokens by setting their probs to zero | |
| indices = torch.nonzero(mask) | |
| logits_of_interest = logits[indices[:, 0], indices[:, 1], :] | |
| labels_of_interest = labels[indices[:, 0], indices[:, 1]] | |
| log_probs = logits_of_interest.gather(1, labels_of_interest.view(-1, 1)) | |
| batch_score = ( | |
| (log_probs.sum() / (-1 * mlm_dataset.num_samples)).exp().item() | |
| ) # exp(x+y) = exp(x)*exp(y) | |
| score *= batch_score | |
| return score | |
| def get_sense_score( | |
| sentence, tokenizer, model, MLM_MASK_TOKEN, MLM_UNK_TOKEN, num_samples | |
| ): | |
| """ | |
| IDEA | |
| ----------------- | |
| PP = perplexity(P) where perplexity(P) function just computes: | |
| (p_1*p_*p_3*...*p_N)^(-1/N) for p_i in P | |
| In practice you need to do the computation in log space to avoid underflow: | |
| e^-((log(p_1) + log(p_2) + ... + log(p_N)) / N) | |
| Note: everytime you run this function, the results change slightly (but the ordering should be relatively the same), | |
| because the tokens to mask are chosen randomly. | |
| """ | |
| tensor_input = tokenizer(sentence, return_tensors="pt")["input_ids"] | |
| batch_input = tensor_input.repeat(num_samples, 1) | |
| random_ids = np.random.choice( | |
| [i for i in range(1, tensor_input.size(1) - 1)], num_samples, replace=False | |
| ) # ensuring that the masking is not done on the BOS and EOS tokens since they are not connected to the sentence itself. | |
| random_ids = torch.Tensor(random_ids).long().unsqueeze(0).T | |
| # Added by Chris Emezue on 29.01.2023 | |
| # Add a term called unk_mask, such that p(w|...) is 0 if w is unk and p(w|...) otherwise | |
| unk_mask = torch.ones( | |
| batch_input.size()[0], batch_input.size()[1], tokenizer.vocab_size | |
| ) | |
| batch_input_for_unk = batch_input.unsqueeze(-1).expand(unk_mask.size()) | |
| unk_mask = unk_mask.masked_fill(batch_input_for_unk == MLM_UNK_TOKEN, 0) | |
| mask = torch.zeros(batch_input.size()) | |
| src = torch.ones(batch_input.size(0)).unsqueeze(0).T | |
| mask.scatter_(1, random_ids, src) | |
| masked_input = batch_input.masked_fill(mask == 1, MLM_MASK_TOKEN) | |
| labels = batch_input.masked_fill(masked_input != MLM_MASK_TOKEN, -100) | |
| # If logits change when labels is not set to -100: | |
| # If we are using the logits, this does not change it then. but if are using the loss, | |
| # then this has an effect. | |
| output = model(masked_input, labels=labels) | |
| logits_ = output["logits"] | |
| logits = ( | |
| logits_ * unk_mask | |
| ) # Penalizing the unk tokens by setting their probs to zero | |
| indices = torch.nonzero(mask) | |
| logits_of_interest = logits[indices[:, 0], indices[:, 1], :] | |
| labels_of_interest = labels[indices[:, 0], indices[:, 1]] | |
| log_probs = logits_of_interest.gather(1, labels_of_interest.view(-1, 1)) | |
| score = (log_probs.sum() / (-1 * num_samples)).exp().item() | |
| return score | |
| def sort_dictionary(dict): | |
| keys = list(dict.keys()) | |
| values = list(dict.values()) | |
| sorted_value_index = np.argsort(values) | |
| sorted_dict = {keys[i]: values[i] for i in sorted_value_index} | |
| return sorted_dict | |
| def set_seed(): | |
| np.random.seed(2023) | |
| torch.manual_seed(2023) | |
| with st.sidebar: | |
| st.selectbox("df", [1,2,3,4]) | |
| footer() | |
| sentence = st.text_input("Please input a sample sentence in the target language") | |
| models = get_model_infos(multilingual=None) | |
| selected_models = st.multiselect( | |
| "Select of number of models you would like to compare", models["id"] | |
| ) | |
| run = st.button("Get Scores") | |
| if run: | |
| progress_text = "Computing recommendation Scores" | |
| st.write(progress_text) | |
| my_bar = st.progress(0) | |
| scores = {} | |
| for index, model_id in enumerate(selected_models): | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelWithLMHead.from_pretrained(model_id) | |
| if model_id.startswith("castorini"): | |
| tokenizer.model_max_length = 512 | |
| MLM_MASK_TOKEN = tokenizer.mask_token_id # [(103, '[MASK]')] | |
| MLM_UNK_TOKEN = tokenizer.unk_token_id | |
| BATCH_SIZE = 1 | |
| score = get_sense_score_batched( | |
| sentence, tokenizer, model, MLM_MASK_TOKEN, MLM_UNK_TOKEN, BATCH_SIZE | |
| ) | |
| scores[model_id] = score | |
| my_bar.progress(index + 1 / len(selected_models)) | |
| scores = sort_dictionary(scores) | |
| st.write("Our recommendation is:", scores) | |