Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import torch | |
| from torch.utils.data import Dataset | |
| import pandas as pd | |
| import numpy as np | |
| import tqdm | |
| import random | |
| from .vocab import Vocab | |
| import pickle | |
| import copy | |
| import os | |
| class TokenizerDataset(Dataset): | |
| """ | |
| Class name: TokenizerDataset | |
| Tokenize the data in the dataset | |
| Feat length: 17 | |
| """ | |
| def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
| self.dataset_path = dataset_path | |
| self.label_path = label_path | |
| self.vocab = vocab # Vocab object | |
| # Related to input dataset file | |
| self.lines = [] | |
| self.labels = [] | |
| self.feats = [] | |
| if self.label_path: | |
| self.label_file = open(self.label_path, "r") | |
| for line in self.label_file: | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| self.labels.append(int(line)) | |
| self.label_file.close() | |
| # Comment this section if you are not using feat attribute | |
| try: | |
| j = 0 | |
| dataset_info_file = open(self.label_path.replace("label", "info"), "r") | |
| for line in dataset_info_file: | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # # highGRschool_w_prior | |
| # feat_vec = [float(i) for i in line.split(",")[-3].split("\t")] | |
| # highGRschool_w_prior_w_diffskill_wo_fa | |
| feat_vec = [float(i) for i in line.split(",")[-3].split("\t")] | |
| feat2 = [float(i) for i in line.split(",")[-2].split("\t")] | |
| feat_vec.extend(feat2[1:]) | |
| if j == 0: | |
| print(len(feat_vec)) | |
| j+=1 | |
| self.feats.append(feat_vec) | |
| dataset_info_file.close() | |
| except Exception as e: | |
| print(e) | |
| self.file = open(self.dataset_path, "r") | |
| for line in self.file: | |
| if line: | |
| line = line.strip() | |
| if line: | |
| self.lines.append(line) | |
| self.file.close() | |
| self.len = len(self.lines) | |
| self.seq_len = seq_len | |
| print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
| def __len__(self): | |
| return self.len | |
| def __getitem__(self, item): | |
| org_line = self.lines[item].split("\t") | |
| dup_line = [] | |
| opt = False | |
| for l in org_line: | |
| if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
| opt = True | |
| if opt and 'FinalAnswer-' in l: | |
| dup_line.append('[UNK]') | |
| else: | |
| dup_line.append(l) | |
| dup_line = "\t".join(dup_line) | |
| # print(dup_line) | |
| s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
| s1_label = self.labels[item] if self.label_path else 0 | |
| segment_label = [1 for _ in range(len(s1))] | |
| s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
| padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
| s1.extend(padding), segment_label.extend(padding) | |
| output = {'input': s1, | |
| 'label': s1_label, | |
| 'feat': s1_feat, | |
| 'segment_label': segment_label} | |
| return {key: torch.tensor(value) for key, value in output.items()} | |
| class TokenizerwSkillsDataset(Dataset): | |
| """ | |
| Feature length: 17 | |
| """ | |
| def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
| print(f"dataset_path: {dataset_path}") | |
| print(f"label_path: {label_path}") | |
| self.dataset_path = dataset_path | |
| self.label_path = label_path | |
| self.vocab = vocab # Vocab object | |
| self.seq_len = seq_len | |
| # Related to input dataset file | |
| self.lines = [] | |
| self.labels = [] | |
| self.feats = [] | |
| selected_lines = [] | |
| print("TokenizerwSkillsDataset...............................") | |
| if self.label_path: | |
| # Comment this section if you are not using feat attribute | |
| dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines() | |
| print(">>>>>>>>>>>>>>>>>", len(dataset_info_file)) | |
| j = 0 | |
| for idex, line in enumerate(dataset_info_file): | |
| try: | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| feat_vec = [float(i) for i in line.split(",")[-9].split("\t")] | |
| feat2 = [float(i) for i in line.split(",")[-8].split("\t")] | |
| feat_vec.extend(feat2[1:]) | |
| if j == 0: | |
| print(";;;;", len(feat_vec), feat_vec) | |
| j+=1 | |
| self.feats.append(feat_vec) | |
| selected_lines.append(idex) | |
| except Exception as e: | |
| print("................>") | |
| print(e) | |
| print("Error at index: ", idex) | |
| self.label_file = open(self.label_path, "r") | |
| for idex, line in enumerate(self.label_file): | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if idex in selected_lines: | |
| self.labels.append(int(line)) | |
| # self.labels.append(int(line)) | |
| self.label_file.close() | |
| self.file = open(self.dataset_path, "r") | |
| for idex, line in enumerate(self.file): | |
| if line: | |
| line = line.strip() | |
| if line: | |
| if idex in selected_lines: | |
| self.lines.append(line) | |
| # self.lines.append(line) | |
| self.file.close() | |
| self.len = len(self.lines) | |
| print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
| def __len__(self): | |
| return self.len | |
| def __getitem__(self, item): | |
| org_line = self.lines[item].split("\t") | |
| dup_line = [] | |
| opt = False | |
| for l in org_line: | |
| if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
| opt = True | |
| if opt and 'FinalAnswer-' in l: | |
| dup_line.append('[UNK]') | |
| else: | |
| dup_line.append(l) | |
| dup_line = "\t".join(dup_line) | |
| # print(dup_line) | |
| s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
| s1_label = self.labels[item] if self.label_path else 0 | |
| segment_label = [1 for _ in range(len(s1))] | |
| s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
| padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
| s1.extend(padding), segment_label.extend(padding) | |
| # print(s1_feat) | |
| output = {'input': s1, | |
| 'label': s1_label, | |
| 'feat': s1_feat, | |
| 'segment_label': segment_label} | |
| return {key: torch.tensor(value) for key, value in output.items()} | |
| class TokenizerwTimeDataset(Dataset): | |
| """ | |
| Feature length: 4 | |
| """ | |
| def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
| print(f"dataset_path: {dataset_path}") | |
| print(f"label_path: {label_path}") | |
| self.dataset_path = dataset_path | |
| self.label_path = label_path | |
| self.vocab = vocab # Vocab object | |
| self.seq_len = seq_len | |
| # Related to input dataset file | |
| self.lines = [] | |
| self.labels = [] | |
| self.feats = [] | |
| selected_lines = [] | |
| print("TokenizerwTimeDataset...............................") | |
| time_df = pickle.load(open("ratio_proportion_change3_2223/sch_largest_100-coded/time_info/full_data_normalized_time.pkl", "rb")) | |
| print("time: ?? ", time_df.shape) | |
| if self.label_path: | |
| # Comment this section if you are not using feat attribute | |
| dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines() | |
| print(">>>>>>>>>>>>>>>>>", len(dataset_info_file)) | |
| j = 0 | |
| for idex, line in enumerate(dataset_info_file): | |
| try: | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| feat_vec = [] | |
| sch = line.split(",")[0] | |
| stu = line.split(",")[2] | |
| progress = line.split(",")[3] | |
| prob_id = line.split(",")[4] | |
| total_time = time_df.loc[(sch, stu, progress, prob_id)]['total_time'].item() | |
| faopt_time = time_df.loc[(sch, stu, progress, prob_id)]['faopt_time'].item() | |
| opt_time = time_df.loc[(sch, stu, progress, prob_id)]['opt_time'].item() | |
| nonopt_time = time_df.loc[(sch, stu, progress, prob_id)]['nonopt_time'].item() | |
| feat_vec.append(faopt_time) | |
| feat_vec.append(total_time) | |
| feat_vec.append(opt_time) | |
| feat_vec.append(nonopt_time) | |
| if j == 0: | |
| print(";;;;", len(feat_vec), feat_vec) | |
| j+=1 | |
| self.feats.append(feat_vec) | |
| selected_lines.append(idex) | |
| except Exception as e: | |
| print("................>") | |
| print(e) | |
| print("Error at index: ", idex) | |
| self.label_file = open(self.label_path, "r") | |
| for idex, line in enumerate(self.label_file): | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if idex in selected_lines: | |
| self.labels.append(int(line)) | |
| # self.labels.append(int(line)) | |
| self.label_file.close() | |
| self.file = open(self.dataset_path, "r") | |
| for idex, line in enumerate(self.file): | |
| if line: | |
| line = line.strip() | |
| if line: | |
| if idex in selected_lines: | |
| self.lines.append(line) | |
| # self.lines.append(line) | |
| self.file.close() | |
| self.len = len(self.lines) | |
| print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
| def __len__(self): | |
| return self.len | |
| def __getitem__(self, item): | |
| org_line = self.lines[item].split("\t") | |
| dup_line = [] | |
| opt = False | |
| for l in org_line: | |
| if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
| opt = True | |
| if opt and 'FinalAnswer-' in l: | |
| dup_line.append('[UNK]') | |
| else: | |
| dup_line.append(l) | |
| dup_line = "\t".join(dup_line) | |
| # print(dup_line) | |
| s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
| s1_label = self.labels[item] if self.label_path else 0 | |
| segment_label = [1 for _ in range(len(s1))] | |
| s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
| padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
| s1.extend(padding), segment_label.extend(padding) | |
| # print(s1_feat) | |
| output = {'input': s1, | |
| 'label': s1_label, | |
| 'feat': s1_feat, | |
| 'segment_label': segment_label} | |
| return {key: torch.tensor(value) for key, value in output.items()} | |
| class TokenizerwSkillsTimeDataset(Dataset): | |
| """ | |
| Feature length: 17+4 = 21 | |
| """ | |
| def __init__(self, dataset_path, label_path, vocab, seq_len=30): | |
| print(f"dataset_path: {dataset_path}") | |
| print(f"label_path: {label_path}") | |
| self.dataset_path = dataset_path | |
| self.label_path = label_path | |
| self.vocab = vocab # Vocab object | |
| self.seq_len = seq_len | |
| # Related to input dataset file | |
| self.lines = [] | |
| self.labels = [] | |
| self.feats = [] | |
| selected_lines = [] | |
| print("TokenizerwSkillsTimeDataset...............................") | |
| time_df = pickle.load(open("ratio_proportion_change3_2223/sch_largest_100-coded/time_info/full_data_normalized_time.pkl", "rb")) | |
| print("time: ", time_df.shape) | |
| if self.label_path: | |
| # Comment this section if you are not using feat attribute | |
| dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines() | |
| print(">>>>>>>>>>>>>>>>>", len(dataset_info_file)) | |
| j = 0 | |
| for idex, line in enumerate(dataset_info_file): | |
| try: | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| feat_vec = [float(i) for i in line.split(",")[-9].split("\t")] | |
| feat2 = [float(i) for i in line.split(",")[-8].split("\t")] | |
| feat_vec.extend(feat2[1:]) | |
| sch = line.split(",")[0] | |
| stu = line.split(",")[2] | |
| progress = line.split(",")[3] | |
| prob_id = line.split(",")[4] | |
| total_time = time_df.loc[(sch, stu, progress, prob_id)]['total_time'].item() | |
| faopt_time = time_df.loc[(sch, stu, progress, prob_id)]['faopt_time'].item() | |
| opt_time = time_df.loc[(sch, stu, progress, prob_id)]['opt_time'].item() | |
| nonopt_time = time_df.loc[(sch, stu, progress, prob_id)]['nonopt_time'].item() | |
| feat_vec.append(faopt_time) | |
| feat_vec.append(total_time) | |
| feat_vec.append(opt_time) | |
| feat_vec.append(nonopt_time) | |
| if j == 0: | |
| print(";;;;", len(feat_vec), feat_vec) | |
| j+=1 | |
| self.feats.append(feat_vec) | |
| selected_lines.append(idex) | |
| except Exception as e: | |
| print("................>") | |
| print(e) | |
| print("Error at index: ", idex) | |
| self.label_file = open(self.label_path, "r") | |
| for idex, line in enumerate(self.label_file): | |
| if line: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if idex in selected_lines: | |
| self.labels.append(int(line)) | |
| # self.labels.append(int(line)) | |
| self.label_file.close() | |
| self.file = open(self.dataset_path, "r") | |
| for idex, line in enumerate(self.file): | |
| if line: | |
| line = line.strip() | |
| if line: | |
| if idex in selected_lines: | |
| self.lines.append(line) | |
| # self.lines.append(line) | |
| self.file.close() | |
| self.len = len(self.lines) | |
| print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0) | |
| def __len__(self): | |
| return self.len | |
| def __getitem__(self, item): | |
| org_line = self.lines[item].split("\t") | |
| dup_line = [] | |
| opt = False | |
| for l in org_line: | |
| if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]: | |
| opt = True | |
| if opt and 'FinalAnswer-' in l: | |
| dup_line.append('[UNK]') | |
| else: | |
| dup_line.append(l) | |
| dup_line = "\t".join(dup_line) | |
| # print(dup_line) | |
| s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP]. | |
| s1_label = self.labels[item] if self.label_path else 0 | |
| segment_label = [1 for _ in range(len(s1))] | |
| s1_feat = self.feats[item] if len(self.feats)>0 else 0 | |
| padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))] | |
| s1.extend(padding), segment_label.extend(padding) | |
| # print(s1_feat) | |
| output = {'input': s1, | |
| 'label': s1_label, | |
| 'feat': s1_feat, | |
| 'segment_label': segment_label} | |
| return {key: torch.tensor(value) for key, value in output.items()} |