reab5555's picture
Update processing.py
5c84887 verified
raw
history blame
7.68 kB
import os
from huggingface_hub import login
import torch
import time
from transformers import AutoTokenizer, pipeline, AutoModelForCausalLM
from langdetect import detect
from langchain.chains import RetrievalQA
from langchain_community.llms import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
import spaces
from transcription_diarization import process_video
# Get Hugging Face token from Space secret
hf_token = os.environ.get('hf_secret')
if not hf_token:
raise ValueError("HF_TOKEN not found in environment variables. Please set it in the Space secrets.")
# Login to Hugging Face
login(token=hf_token)
# Analysis Pipeline Classes
class LazyPipeline:
def __init__(self):
self.pipeline = None
@spaces.GPU(duration=250)
def get_pipeline(self):
if self.pipeline is None:
model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=hf_token)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto",
use_auth_token=hf_token
)
self.pipeline = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=800,
temperature=0.3,
top_p = 0.95,
top_k = 5,
repetition_penalty = 1.2,
do_sample=True,
)
return self.pipeline
class LazyLLM:
def __init__(self, lazy_pipeline):
self.lazy_pipeline = lazy_pipeline
self.llm = None
@spaces.GPU(duration=150)
def get_llm(self):
if self.llm is None:
pipe = self.lazy_pipeline.get_pipeline()
self.llm = HuggingFacePipeline(pipeline=pipe)
return self.llm
class LazyChains:
def __init__(self, lazy_llm):
self.lazy_llm = lazy_llm
self.attachments_chain = None
self.bigfive_chain = None
self.personalities_chain = None
def create_prompt(self, task):
return PromptTemplate(
template=task + "\n\nContext: {context}\n\nTask: {question}\n\n-----------\n\nAnswer: ",
input_variables=["context", "question"]
)
@spaces.GPU(duration=200)
def get_chains(self):
if self.attachments_chain is None:
llm = self.lazy_llm.get_llm()
self.attachments_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=attachments_db.as_retriever(),
chain_type_kwargs={"prompt": self.create_prompt(attachments_task)}
)
self.bigfive_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=bigfive_db.as_retriever(),
chain_type_kwargs={"prompt": self.create_prompt(bigfive_task)}
)
self.personalities_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=personalities_db.as_retriever(),
chain_type_kwargs={"prompt": self.create_prompt(personalities_task)}
)
return self.attachments_chain, self.bigfive_chain, self.personalities_chain
lazy_pipe = LazyPipeline()
lazy_llm = LazyLLM(lazy_pipe)
lazy_chains = LazyChains(lazy_llm)
# Load instruction files
def load_instructions(file_path):
with open(file_path, 'r') as file:
return file.read().strip()
attachments_task = load_instructions("tasks/Attachments_task.txt")
bigfive_task = load_instructions("tasks/BigFive_task.txt")
personalities_task = load_instructions("tasks/Personalities_task.txt")
# Load knowledge files and create vector stores
def load_knowledge(file_path):
loader = TextLoader(file_path)
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
return texts
embeddings = HuggingFaceEmbeddings()
attachments_db = FAISS.from_documents(load_knowledge("knowledge/bartholomew_attachments_definitions - no int.txt"), embeddings)
bigfive_db = FAISS.from_documents(load_knowledge("knowledge/bigfive_definitions.txt"), embeddings)
personalities_db = FAISS.from_documents(load_knowledge("knowledge/personalities_definitions.txt"), embeddings)
def detect_language(text):
try:
return detect(text)
except:
return "en" # default to English if detection fails
# Analysis functions
def analyze_content(content, safe_progress):
attachments_chain, bigfive_chain, personalities_chain = lazy_chains.get_chains()
safe_progress(0.6, desc="Analyzing attachments...")
attachments_result = attachments_chain({"query": content})
attachments_answer = attachments_result['result'].split("-----------\n\nAnswer:")[-1].strip()
safe_progress(0.7, desc="Analyzing Big Five traits...")
bigfive_result = bigfive_chain({"query": content})
bigfive_answer = bigfive_result['result'].split("-----------\n\nAnswer:")[-1].strip()
safe_progress(0.8, desc="Analyzing personalities...")
personalities_result = personalities_chain({"query": content})
personalities_answer = personalities_result['result'].split("-----------\n\nAnswer:")[-1].strip()
return attachments_answer, bigfive_answer, personalities_answer
# Main processing function
def process_input(input_file, progress=None):
start_time = time.time()
def safe_progress(value, desc=""):
if progress is not None:
try:
progress(value, desc=desc)
except Exception as e:
print(f"Progress update failed: {e}")
safe_progress(0, desc="Processing file...")
file_extension = os.path.splitext(input_file.name)[1].lower()
if file_extension == '.txt':
with open(input_file.name, 'r', encoding='utf-8') as file:
content = file.read()
elif file_extension == '.pdf':
loader = PyPDFLoader(input_file.name)
pages = loader.load_and_split()
content = '\n'.join([page.page_content for page in pages])
elif file_extension in ['.mp4', '.avi', '.mov']:
safe_progress(0.2, desc="Processing video...")
srt_path = process_video(input_file.name, hf_token, "en")
with open(srt_path, 'r', encoding='utf-8') as file:
content = file.read()
os.remove(srt_path)
else:
return "Unsupported file format. Please upload a TXT, PDF, or video file.", None, None, None, None, None
detected_language = detect_language(content)
safe_progress(0.4, desc="Analyzing content...")
attachments_answer, bigfive_answer, personalities_answer = analyze_content(content, safe_progress)
end_time = time.time()
execution_time = end_time - start_time
execution_info = f"{execution_time:.2f} seconds"
safe_progress(1.0, desc="Analysis complete!")
print("Attachments output:", attachments_answer)
print("Big Five output:", bigfive_answer)
print("Personalities output:", personalities_answer)
return ("Analysis complete!", execution_info, detected_language,
attachments_answer, bigfive_answer, personalities_answer)