astra / src /dataset.py
suryadev1's picture
Upload dataset.py
9f91555
raw
history blame
17.7 kB
import torch
from torch.utils.data import Dataset
import pandas as pd
import numpy as np
import tqdm
import random
from .vocab import Vocab
import pickle
import copy
import os
class TokenizerDataset(Dataset):
"""
Class name: TokenizerDataset
Tokenize the data in the dataset
Feat length: 17
"""
def __init__(self, dataset_path, label_path, vocab, seq_len=30):
self.dataset_path = dataset_path
self.label_path = label_path
self.vocab = vocab # Vocab object
# Related to input dataset file
self.lines = []
self.labels = []
self.feats = []
if self.label_path:
self.label_file = open(self.label_path, "r")
for line in self.label_file:
if line:
line = line.strip()
if not line:
continue
self.labels.append(int(line))
self.label_file.close()
# Comment this section if you are not using feat attribute
try:
j = 0
dataset_info_file = open(self.label_path.replace("label", "info"), "r")
for line in dataset_info_file:
if line:
line = line.strip()
if not line:
continue
# # highGRschool_w_prior
# feat_vec = [float(i) for i in line.split(",")[-3].split("\t")]
# highGRschool_w_prior_w_diffskill_wo_fa
feat_vec = [float(i) for i in line.split(",")[-3].split("\t")]
feat2 = [float(i) for i in line.split(",")[-2].split("\t")]
feat_vec.extend(feat2[1:])
if j == 0:
print(len(feat_vec))
j+=1
self.feats.append(feat_vec)
dataset_info_file.close()
except Exception as e:
print(e)
self.file = open(self.dataset_path, "r")
for line in self.file:
if line:
line = line.strip()
if line:
self.lines.append(line)
self.file.close()
self.len = len(self.lines)
self.seq_len = seq_len
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0)
def __len__(self):
return self.len
def __getitem__(self, item):
org_line = self.lines[item].split("\t")
dup_line = []
opt = False
for l in org_line:
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]:
opt = True
if opt and 'FinalAnswer-' in l:
dup_line.append('[UNK]')
else:
dup_line.append(l)
dup_line = "\t".join(dup_line)
# print(dup_line)
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP].
s1_label = self.labels[item] if self.label_path else 0
segment_label = [1 for _ in range(len(s1))]
s1_feat = self.feats[item] if len(self.feats)>0 else 0
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))]
s1.extend(padding), segment_label.extend(padding)
output = {'input': s1,
'label': s1_label,
'feat': s1_feat,
'segment_label': segment_label}
return {key: torch.tensor(value) for key, value in output.items()}
class TokenizerwSkillsDataset(Dataset):
"""
Feature length: 17
"""
def __init__(self, dataset_path, label_path, vocab, seq_len=30):
print(f"dataset_path: {dataset_path}")
print(f"label_path: {label_path}")
self.dataset_path = dataset_path
self.label_path = label_path
self.vocab = vocab # Vocab object
self.seq_len = seq_len
# Related to input dataset file
self.lines = []
self.labels = []
self.feats = []
selected_lines = []
print("TokenizerwSkillsDataset...............................")
if self.label_path:
# Comment this section if you are not using feat attribute
dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines()
print(">>>>>>>>>>>>>>>>>", len(dataset_info_file))
j = 0
for idex, line in enumerate(dataset_info_file):
try:
if line:
line = line.strip()
if not line:
continue
feat_vec = [float(i) for i in line.split(",")[-9].split("\t")]
feat2 = [float(i) for i in line.split(",")[-8].split("\t")]
feat_vec.extend(feat2[1:])
if j == 0:
print(";;;;", len(feat_vec), feat_vec)
j+=1
self.feats.append(feat_vec)
selected_lines.append(idex)
except Exception as e:
print("................>")
print(e)
print("Error at index: ", idex)
self.label_file = open(self.label_path, "r")
for idex, line in enumerate(self.label_file):
if line:
line = line.strip()
if not line:
continue
if idex in selected_lines:
self.labels.append(int(line))
# self.labels.append(int(line))
self.label_file.close()
self.file = open(self.dataset_path, "r")
for idex, line in enumerate(self.file):
if line:
line = line.strip()
if line:
if idex in selected_lines:
self.lines.append(line)
# self.lines.append(line)
self.file.close()
self.len = len(self.lines)
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0)
def __len__(self):
return self.len
def __getitem__(self, item):
org_line = self.lines[item].split("\t")
dup_line = []
opt = False
for l in org_line:
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]:
opt = True
if opt and 'FinalAnswer-' in l:
dup_line.append('[UNK]')
else:
dup_line.append(l)
dup_line = "\t".join(dup_line)
# print(dup_line)
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP].
s1_label = self.labels[item] if self.label_path else 0
segment_label = [1 for _ in range(len(s1))]
s1_feat = self.feats[item] if len(self.feats)>0 else 0
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))]
s1.extend(padding), segment_label.extend(padding)
# print(s1_feat)
output = {'input': s1,
'label': s1_label,
'feat': s1_feat,
'segment_label': segment_label}
return {key: torch.tensor(value) for key, value in output.items()}
class TokenizerwTimeDataset(Dataset):
"""
Feature length: 4
"""
def __init__(self, dataset_path, label_path, vocab, seq_len=30):
print(f"dataset_path: {dataset_path}")
print(f"label_path: {label_path}")
self.dataset_path = dataset_path
self.label_path = label_path
self.vocab = vocab # Vocab object
self.seq_len = seq_len
# Related to input dataset file
self.lines = []
self.labels = []
self.feats = []
selected_lines = []
print("TokenizerwTimeDataset...............................")
time_df = pickle.load(open("ratio_proportion_change3_2223/sch_largest_100-coded/time_info/full_data_normalized_time.pkl", "rb"))
print("time: ?? ", time_df.shape)
if self.label_path:
# Comment this section if you are not using feat attribute
dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines()
print(">>>>>>>>>>>>>>>>>", len(dataset_info_file))
j = 0
for idex, line in enumerate(dataset_info_file):
try:
if line:
line = line.strip()
if not line:
continue
feat_vec = []
sch = line.split(",")[0]
stu = line.split(",")[2]
progress = line.split(",")[3]
prob_id = line.split(",")[4]
total_time = time_df.loc[(sch, stu, progress, prob_id)]['total_time'].item()
faopt_time = time_df.loc[(sch, stu, progress, prob_id)]['faopt_time'].item()
opt_time = time_df.loc[(sch, stu, progress, prob_id)]['opt_time'].item()
nonopt_time = time_df.loc[(sch, stu, progress, prob_id)]['nonopt_time'].item()
feat_vec.append(faopt_time)
feat_vec.append(total_time)
feat_vec.append(opt_time)
feat_vec.append(nonopt_time)
if j == 0:
print(";;;;", len(feat_vec), feat_vec)
j+=1
self.feats.append(feat_vec)
selected_lines.append(idex)
except Exception as e:
print("................>")
print(e)
print("Error at index: ", idex)
self.label_file = open(self.label_path, "r")
for idex, line in enumerate(self.label_file):
if line:
line = line.strip()
if not line:
continue
if idex in selected_lines:
self.labels.append(int(line))
# self.labels.append(int(line))
self.label_file.close()
self.file = open(self.dataset_path, "r")
for idex, line in enumerate(self.file):
if line:
line = line.strip()
if line:
if idex in selected_lines:
self.lines.append(line)
# self.lines.append(line)
self.file.close()
self.len = len(self.lines)
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0)
def __len__(self):
return self.len
def __getitem__(self, item):
org_line = self.lines[item].split("\t")
dup_line = []
opt = False
for l in org_line:
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]:
opt = True
if opt and 'FinalAnswer-' in l:
dup_line.append('[UNK]')
else:
dup_line.append(l)
dup_line = "\t".join(dup_line)
# print(dup_line)
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP].
s1_label = self.labels[item] if self.label_path else 0
segment_label = [1 for _ in range(len(s1))]
s1_feat = self.feats[item] if len(self.feats)>0 else 0
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))]
s1.extend(padding), segment_label.extend(padding)
# print(s1_feat)
output = {'input': s1,
'label': s1_label,
'feat': s1_feat,
'segment_label': segment_label}
return {key: torch.tensor(value) for key, value in output.items()}
class TokenizerwSkillsTimeDataset(Dataset):
"""
Feature length: 17+4 = 21
"""
def __init__(self, dataset_path, label_path, vocab, seq_len=30):
print(f"dataset_path: {dataset_path}")
print(f"label_path: {label_path}")
self.dataset_path = dataset_path
self.label_path = label_path
self.vocab = vocab # Vocab object
self.seq_len = seq_len
# Related to input dataset file
self.lines = []
self.labels = []
self.feats = []
selected_lines = []
print("TokenizerwSkillsTimeDataset...............................")
time_df = pickle.load(open("ratio_proportion_change3_2223/sch_largest_100-coded/time_info/full_data_normalized_time.pkl", "rb"))
print("time: ", time_df.shape)
if self.label_path:
# Comment this section if you are not using feat attribute
dataset_info_file = open(self.label_path.replace("label", "info"), "r").readlines()
print(">>>>>>>>>>>>>>>>>", len(dataset_info_file))
j = 0
for idex, line in enumerate(dataset_info_file):
try:
if line:
line = line.strip()
if not line:
continue
feat_vec = [float(i) for i in line.split(",")[-9].split("\t")]
feat2 = [float(i) for i in line.split(",")[-8].split("\t")]
feat_vec.extend(feat2[1:])
sch = line.split(",")[0]
stu = line.split(",")[2]
progress = line.split(",")[3]
prob_id = line.split(",")[4]
total_time = time_df.loc[(sch, stu, progress, prob_id)]['total_time'].item()
faopt_time = time_df.loc[(sch, stu, progress, prob_id)]['faopt_time'].item()
opt_time = time_df.loc[(sch, stu, progress, prob_id)]['opt_time'].item()
nonopt_time = time_df.loc[(sch, stu, progress, prob_id)]['nonopt_time'].item()
feat_vec.append(faopt_time)
feat_vec.append(total_time)
feat_vec.append(opt_time)
feat_vec.append(nonopt_time)
if j == 0:
print(";;;;", len(feat_vec), feat_vec)
j+=1
self.feats.append(feat_vec)
selected_lines.append(idex)
except Exception as e:
print("................>")
print(e)
print("Error at index: ", idex)
self.label_file = open(self.label_path, "r")
for idex, line in enumerate(self.label_file):
if line:
line = line.strip()
if not line:
continue
if idex in selected_lines:
self.labels.append(int(line))
# self.labels.append(int(line))
self.label_file.close()
self.file = open(self.dataset_path, "r")
for idex, line in enumerate(self.file):
if line:
line = line.strip()
if line:
if idex in selected_lines:
self.lines.append(line)
# self.lines.append(line)
self.file.close()
self.len = len(self.lines)
print("Sequence length set at ", self.seq_len, len(self.lines), len(self.labels) if self.label_path else 0)
def __len__(self):
return self.len
def __getitem__(self, item):
org_line = self.lines[item].split("\t")
dup_line = []
opt = False
for l in org_line:
if l in ["OptionalTask_1", "EquationAnswer", "NumeratorFactor", "DenominatorFactor", "OptionalTask_2", "FirstRow1:1", "FirstRow1:2", "FirstRow2:1", "FirstRow2:2", "SecondRow", "ThirdRow"]:
opt = True
if opt and 'FinalAnswer-' in l:
dup_line.append('[UNK]')
else:
dup_line.append(l)
dup_line = "\t".join(dup_line)
# print(dup_line)
s1 = self.vocab.to_seq(dup_line, self.seq_len) # This is like tokenizer and adds [CLS] and [SEP].
s1_label = self.labels[item] if self.label_path else 0
segment_label = [1 for _ in range(len(s1))]
s1_feat = self.feats[item] if len(self.feats)>0 else 0
padding = [self.vocab.vocab['[PAD]'] for _ in range(self.seq_len - len(s1))]
s1.extend(padding), segment_label.extend(padding)
# print(s1_feat)
output = {'input': s1,
'label': s1_label,
'feat': s1_feat,
'segment_label': segment_label}
return {key: torch.tensor(value) for key, value in output.items()}