import json import os from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from datasets import Dataset # Tag dictionaries DOMAIN_TAGS = { "physics": "[PHYS]", "biology": "[BIO]", "materials": "[MAT]", "education": "[GEN]", } TASK_TAGS = { "hypothesis": "[HYP]", "method": "[MTH]", "experiment": "[EXP]", } SECTION_TAGS = { "abstract": "[ABSTRACT]", "introduction": "[INTRO]", "results": "[RESULTS]", "discussion": "[DISCUSSION]", "conclusion": "[CONCLUSION]", "method": "[MTH]", "experiment": "[EXP]", } SRC_PATH = Path(r"C:\Users\kunya\PycharmProjects\DataVolt\Tokenization\scientific_corpus_325M.jsonl") CLEANED_JSONL_PATH = Path("scientific_corpus_325M.cleaned.jsonl") CLEANED_ARROW_PATH = Path("scientific_corpus_325M.cleaned.arrow") CHUNK_SIZE = 10000 MAX_WORKERS = os.cpu_count() or 4 def tag_record(record): # Tagging logic: add tags to text fields if domain/task/section present # You may need to adjust keys based on your schema domain = record.get("domain", "").lower() task = record.get("task", "").lower() section = record.get("section", "").lower() text = record.get("full_text", "") tags = [] if domain in DOMAIN_TAGS: tags.append(DOMAIN_TAGS[domain]) if task in TASK_TAGS: tags.append(TASK_TAGS[task]) if section in SECTION_TAGS: tags.append(SECTION_TAGS[section]) # Prepend tags to text record["tagged_text"] = " ".join(tags) + " " + text if tags else text return record def process_chunk(lines): cleaned = [] for line in lines: try: record = json.loads(line) cleaned.append(tag_record(record)) except Exception: continue # skip malformed lines return cleaned def chunked_file_reader(path, chunk_size): with open(path, "r", encoding="utf-8") as f: chunk = [] for line in f: chunk.append(line) if len(chunk) == chunk_size: yield chunk chunk = [] if chunk: yield chunk def main(): print("Starting cleaning process...") # Write cleaned records to a new JSONL file in chunks with open(CLEANED_JSONL_PATH, "w", encoding="utf-8") as out_f: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] for chunk in chunked_file_reader(SRC_PATH, CHUNK_SIZE): futures.append(executor.submit(process_chunk, chunk)) for fut in as_completed(futures): for record in fut.result(): out_f.write(json.dumps(record, ensure_ascii=False) + "\n") print(f"Cleaned JSONL written to {CLEANED_JSONL_PATH}") # Convert cleaned JSONL to Arrow using datasets (handles chunking internally) print("Saving cleaned dataset to Arrow format...") ds = Dataset.from_json(str(CLEANED_JSONL_PATH)) ds.save_to_disk(str(CLEANED_ARROW_PATH)) print(f"Saved cleaned Arrow dataset at: {CLEANED_ARROW_PATH}") # Optionally, call hf_upload.py asynchronously print("Uploading to HuggingFace using hf_upload.py ...") os.system(f"python hf_upload.py") if __name__ == "__main__": main()