""" Core.py: Orchestrates dataset generation jobs, plan enforcement, and background processing. """ import threading import uuid import os import json from .Config import PLAN_LIMITS, tmp_dir from .Progress import progress_tracker from .Payment import payment_manager # Import your tokenizer module here (example) from Tokenization.generate_dataset import generate_dataset from Tokenization.Main_2 import ScientificCorpusBuilder, CorpusConfig from Tokenization.Build_tokenizer import QLoRAPreprocessor import nltk class JobManager: def __init__(self): self.jobs = {} self.lock = threading.Lock() def start_job(self, user_input): plan = user_input.get("plan") token_budget = user_input.get("token_budget") job_type = user_input.get("job_type", "tokenize") # "tokenize", "corpus", or "label" # For label jobs, token_budget is determined after upload if job_type != "label" and not payment_manager.check_plan_limit(plan, token_budget): return None, "Plan limit exceeded" job_id = str(uuid.uuid4()) with self.lock: self.jobs[job_id] = { "status": "pending", "plan": plan, "token_budget": token_budget, "job_type": job_type, "user_input": user_input } if job_type == "corpus": thread = threading.Thread(target=self._run_corpus_pipeline, args=(job_id,)) elif job_type == "label": thread = threading.Thread(target=self._run_label_pipeline, args=(job_id,)) else: thread = threading.Thread(target=self._run_job, args=(job_id, user_input)) thread.start() return job_id, None def _run_job(self, job_id, user_input): try: progress_tracker.start_job(job_id, total_steps=6) # Step 1: Data retrieval progress_tracker.update(job_id, 1, "Retrieving data from sources...") domain = user_input.get("domain") token_budget = user_input.get("token_budget") plan = user_input.get("plan") custom_seed = user_input.get("custom_seed", None) # Step 2: Preprocessing progress_tracker.update(job_id, 2, "Preprocessing and cleaning data...") # Step 3: Tokenization & Labeling progress_tracker.update(job_id, 3, "Tokenizing and labeling samples...") # Step 4: Validation & Stats progress_tracker.update(job_id, 4, "Validating and computing statistics...") # Step 5: Formatting output progress_tracker.update(job_id, 5, "Formatting dataset as JSONL...") # Call tokenizer pipeline (implement in tokenization/tokenizer.py) result = generate_dataset( domain=domain, token_budget=token_budget, plan=plan, custom_seed=custom_seed, progress_callback=lambda step, msg: progress_tracker.update(job_id, step, msg) ) # Step 6: Save output os.makedirs(tmp_dir, exist_ok=True) output_path = os.path.join(tmp_dir, f"{domain}_{token_budget}_tokens_{job_id}.jsonl") with open(output_path, "w", encoding="utf-8") as f: for line in result["jsonl_lines"]: f.write(line + "\n") progress_tracker.update(job_id, 6, "Dataset ready for download.") progress_tracker.complete(job_id) with self.lock: self.jobs[job_id]["status"] = "complete" self.jobs[job_id]["result_path"] = output_path self.jobs[job_id]["stats"] = result.get("stats", {}) except Exception as e: progress_tracker.update(job_id, 0, f"Job failed: {str(e)}") with self.lock: self.jobs[job_id]["status"] = "failed" self.jobs[job_id]["error"] = str(e) def _run_corpus_pipeline(self, job_id): try: with self.lock: user_input = self.jobs[job_id]["user_input"] plan = user_input.get("plan") token_budget = user_input.get("token_budget") progress_tracker.start_job(job_id, total_steps=5) progress_tracker.update(job_id, 1, "Building scientific corpus...") config = CorpusConfig() builder = ScientificCorpusBuilder(config) corpus, stats = builder.build_corpus_scoped(plan, token_budget) progress_tracker.update(job_id, 2, "Formatting dataset as JSONL...") jsonl_lines = [json.dumps(paper, ensure_ascii=False) for paper in corpus] progress_tracker.update(job_id, 3, "Finalizing output...") progress_tracker.update(job_id, 4, "Corpus ready for download.") progress_tracker.complete(job_id) with self.lock: self.jobs[job_id]["status"] = "complete" self.jobs[job_id]["jsonl_lines"] = jsonl_lines self.jobs[job_id]["stats"] = stats self.jobs[job_id]["actual_tokens"] = stats.get("total_tokens", 0) except Exception as e: progress_tracker.update(job_id, 0, f"Job failed: {str(e)}") with self.lock: self.jobs[job_id]["status"] = "failed" self.jobs[job_id]["error"] = str(e) def _run_label_pipeline(self, job_id): try: with self.lock: user_input = self.jobs[job_id]["user_input"] plan = self.jobs[job_id]["plan"] progress_tracker.start_job(job_id, total_steps=4) progress_tracker.update(job_id, 1, "Loading and preprocessing dataset...") dataset_text = user_input.get("dataset_text", "") if not dataset_text: raise ValueError("No dataset text provided.") tokens = nltk.word_tokenize(dataset_text) num_tokens = len(tokens) with self.lock: self.jobs[job_id]["actual_tokens"] = num_tokens if not payment_manager.check_plan_limit(plan, num_tokens): raise ValueError("Plan limit exceeded.") progress_tracker.update(job_id, 2, "Tokenizing and labeling dataset...") preprocessor = QLoRAPreprocessor() labeled_data = preprocessor.preprocess_function(dataset_text) jsonl_lines = [json.dumps({"text": item}, ensure_ascii=False) for item in labeled_data] stats = {"token_count": num_tokens, "sample_count": len(labeled_data)} progress_tracker.update(job_id, 3, "Dataset ready for download.") progress_tracker.complete(job_id) with self.lock: self.jobs[job_id]["status"] = "complete" self.jobs[job_id]["jsonl_lines"] = jsonl_lines self.jobs[job_id]["stats"] = stats except Exception as e: progress_tracker.update(job_id, 0, f"Job failed: {str(e)}") with self.lock: self.jobs[job_id]["status"] = "failed" self.jobs[job_id]["error"] = str(e) def get_job_status(self, job_id): with self.lock: return self.jobs.get(job_id, None) job_manager = JobManager()