Olubakka / src /chat_logic.py
Sachi Wagaarachchi
bugfixes, update response streaming, thought
c285061
from transformers import TextIteratorStreamer
import threading
from src.utils import (
preprocess_chat_input,
format_prompt,
prepare_generation_inputs,
postprocess_response
)
import logging
class ChatProcessor:
"""Processes chat interactions using Qwen models"""
def __init__(self, model_manager, vector_db):
self.model_manager = model_manager
self.vector_db = vector_db
self.logger = logging.getLogger(__name__)
def process_chat(self, message, history, model_name, temperature=0.7,
max_new_tokens=512, top_p=0.9, top_k=50, repetition_penalty=1.2,
system_prompt=""):
"""
Process chat input and generate streaming response.
This method handles the complete chat processing pipeline:
1. Pre-processing: Format the input with history and system prompt
2. Model inference: Generate a response using the specified model
3. Post-processing: Stream the response tokens
Args:
message (str): The current user message
history (list): List of tuples containing (user_message, assistant_message) pairs
model_name (str): Name of the model to use
temperature (float): Sampling temperature
max_new_tokens (int): Maximum number of tokens to generate
top_p (float): Nucleus sampling parameter
top_k (int): Top-k sampling parameter
repetition_penalty (float): Penalty for token repetition
system_prompt (str): Optional system prompt to guide the model's behavior
Yields:
str: Response tokens as they are generated
"""
cancel_event = threading.Event()
debug = ''
try:
# 1. PRE-PROCESSING
# Get model pipeline
pipe = self.model_manager.get_pipeline(model_name)
# Format prompt with history and tokenizer
prompt = format_prompt(message, history, pipe.tokenizer, system_prompt)
# Set up streamer for token-by-token generation
streamer = TextIteratorStreamer(
pipe.tokenizer,
skip_prompt=True,
skip_special_tokens=True
)
# Prepare tokenized inputs
inputs_on_device = prepare_generation_inputs(
prompt,
pipe.tokenizer,
pipe.model.device
)
# 2. MODEL INFERENCE
# Prepare generation parameters
generate_kwargs = {
"input_ids": inputs_on_device["input_ids"],
"attention_mask": inputs_on_device["attention_mask"],
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
"streamer": streamer
}
print(f"Running generate with kwargs: {generate_kwargs}")
# Start generation in a separate thread
thread = threading.Thread(target=pipe.model.generate, kwargs=generate_kwargs)
thread.start()
# Buffers for thought vs answer
thought_buf = ''
answer_buf = ''
in_thought = False
# Stream tokens
for chunk in streamer:
if cancel_event.is_set():
break
text = chunk
# Detect start of thinking
if not in_thought and '<think>' in text:
in_thought = True
# Insert thought placeholder
history.append({
'role': 'assistant',
'content': '',
'metadata': {'title': '💭 Thought'}
})
# Capture after opening tag
after = text.split('<think>', 1)[1]
thought_buf += after
# If closing tag in same chunk
if '</think>' in thought_buf:
before, after2 = thought_buf.split('</think>', 1)
history[-1]['content'] = before.strip()
in_thought = False
# Start answer buffer
answer_buf = after2
history.append({'role': 'assistant', 'content': answer_buf})
else:
history[-1]['content'] = thought_buf
yield history, debug
continue
# Continue thought streaming
if in_thought:
thought_buf += text
if '</think>' in thought_buf:
before, after2 = thought_buf.split('</think>', 1)
history[-1]['content'] = before.strip()
in_thought = False
# Start answer buffer
answer_buf = after2
history.append({'role': 'assistant', 'content': answer_buf})
else:
history[-1]['content'] = thought_buf
yield history, debug
continue
# Stream answer
if not answer_buf:
history.append({'role': 'assistant', 'content': ''})
answer_buf += text
history[-1]['content'] = answer_buf
yield history, debug
thread.join()
yield history, debug
# 3. POST-PROCESSING
# Stream response tokens
# response = ""
# for token in streamer:
# response += token
# # Yield each token for streaming UI
# yield token
# # Post-process the complete response
# processed_response = postprocess_response(response)
# # Yield the final processed response
# yield processed_response
except Exception as e:
self.logger.error(f"Chat processing error: {str(e)}")
yield f"Error: {str(e)}"
def generate_with_pipeline(self, message, history, model_name, generation_config=None, system_prompt=""):
"""
Alternative method that uses the Hugging Face pipeline directly.
This method demonstrates a more direct use of the pipeline API.
Args:
message (str): The current user message
history (list): List of tuples containing (user_message, assistant_message) pairs
model_name (str): Name of the model to use
generation_config (dict): Configuration for text generation
system_prompt (str): Optional system prompt to guide the model's behavior
Returns:
str: The generated response
"""
try:
# Get model pipeline
pipe = self.model_manager.get_pipeline(model_name)
# Pre-process: Format messages for the pipeline
messages = preprocess_chat_input(message, history, system_prompt)
# Set default generation config if not provided
if generation_config is None:
generation_config = {
"max_new_tokens": 512,
"temperature": 0.7,
"top_p": 0.9,
"top_k": 50,
"repetition_penalty": 1.2,
"do_sample": True
}
# Direct pipeline inference
print(f"Running pipeline with messages: {messages}")
print(f"Generation config: {generation_config}")
response = pipe(
messages,
**generation_config
)
# Post-process the response
if isinstance(response, list):
return postprocess_response(response[0]["generated_text"])
else:
return postprocess_response(response["generated_text"])
except Exception as e:
self.logger.error(f"Pipeline generation error: {str(e)}")
return f"Error: {str(e)}"