|
import streamlit as st |
|
from ibm_watsonx_ai.foundation_models import ModelInference |
|
from ibm_watsonx_ai import Credentials, APIClient |
|
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams |
|
from knowledge_bases import KNOWLEDGE_BASE_OPTIONS, SYSTEM_PROMPTS |
|
import genparam |
|
import time |
|
|
|
def check_password(): |
|
"""Password protection check for the app.""" |
|
def password_entered(): |
|
if st.session_state["password"] == st.secrets["app_password"]: |
|
st.session_state["password_correct"] = True |
|
del st.session_state["password"] |
|
else: |
|
st.session_state["password_correct"] = False |
|
|
|
if "password_correct" not in st.session_state: |
|
st.markdown("\n\n") |
|
st.text_input("Enter the password", type="password", on_change=password_entered, key="password") |
|
st.divider() |
|
st.info("Designed and developed by Milan Mrdenovic © IBM Norway 2024") |
|
return False |
|
elif not st.session_state["password_correct"]: |
|
st.markdown("\n\n") |
|
st.text_input("Enter the password", type="password", on_change=password_entered, key="password") |
|
st.divider() |
|
st.error("😕 Incorrect password") |
|
st.info("Designed and developed by Milan Mrdenovic © IBM Norway 2024") |
|
return False |
|
else: |
|
return True |
|
|
|
def initialize_session_state(): |
|
"""Initialize all session state variables.""" |
|
if 'chat_history_1' not in st.session_state: |
|
st.session_state.chat_history_1 = [] |
|
if 'chat_history_2' not in st.session_state: |
|
st.session_state.chat_history_2 = [] |
|
if 'chat_history_3' not in st.session_state: |
|
st.session_state.chat_history_3 = [] |
|
if 'first_question' not in st.session_state: |
|
st.session_state.first_question = False |
|
if "counter" not in st.session_state: |
|
st.session_state["counter"] = 0 |
|
if 'token_statistics' not in st.session_state: |
|
st.session_state.token_statistics = [] |
|
if 'selected_kb' not in st.session_state: |
|
st.session_state.selected_kb = KNOWLEDGE_BASE_OPTIONS[0] |
|
if 'current_system_prompts' not in st.session_state: |
|
st.session_state.current_system_prompts = SYSTEM_PROMPTS[st.session_state.selected_kb] |
|
|
|
def setup_client(project_id=None): |
|
"""Setup WatsonX client with credentials.""" |
|
credentials = Credentials( |
|
url=st.secrets["url"], |
|
api_key=st.secrets["api_key"] |
|
) |
|
project_id = project_id or st.secrets["project_id"] |
|
client = APIClient(credentials, project_id=project_id) |
|
return credentials, client |
|
|
|
def get_active_model(): |
|
"""Get the currently active model based on configuration.""" |
|
return genparam.SELECTED_MODEL_1 if genparam.ACTIVE_MODEL == 0 else genparam.SELECTED_MODEL_2 |
|
|
|
def get_active_prompt_template(): |
|
"""Get the currently active prompt template.""" |
|
return genparam.PROMPT_TEMPLATE_1 if genparam.ACTIVE_MODEL == 0 else genparam.PROMPT_TEMPLATE_2 |
|
|
|
def prepare_prompt(prompt, chat_history): |
|
"""Prepare the prompt with chat history if available.""" |
|
if genparam.TYPE == "chat" and chat_history: |
|
chats = "\n".join([f"{message['role']}: \"{message['content']}\"" for message in chat_history]) |
|
return f"Conversation History:\n{chats}\n\nNew User Input: {prompt}" |
|
return f"User Input: {prompt}" |
|
|
|
def apply_prompt_syntax(prompt, system_prompt, prompt_template, bake_in_prompt_syntax): |
|
"""Apply appropriate syntax to the prompt based on model requirements.""" |
|
model_family_syntax = { |
|
"llama3-instruct (llama-3, 3.1 & 3.2) - system": """<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{system_prompt}<|eot_id|><|start_header_id|>user<|end_header_id|>\n\n{prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n""", |
|
"llama3-instruct (llama-3, 3.1 & 3.2) - user": """<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n{prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n""", |
|
"granite-13b-chat & instruct - system": """<|system|>\n{system_prompt}\n<|user|>\n{prompt}\n<|assistant|>\n\n""", |
|
"granite-13b-chat & instruct - user": """<|user|>\n{prompt}\n<|assistant|>\n\n""", |
|
"mistral & mixtral v2 tokenizer - system": """<s>[INST] System Prompt: {system_prompt} [/INST][INST] {prompt} [/INST]\n\n""", |
|
"mistral & mixtral v2 tokenizer - user": """<s>[INST] {prompt} [/INST]\n\n""", |
|
"no syntax - system": """{system_prompt}\n\n{prompt}""", |
|
"no syntax - user": """{prompt}""" |
|
} |
|
|
|
if bake_in_prompt_syntax: |
|
template = model_family_syntax[prompt_template] |
|
if system_prompt: |
|
return template.format(system_prompt=system_prompt, prompt=prompt) |
|
return prompt |
|
|
|
def generate_response(watsonx_llm, prompt_data, params): |
|
"""Generate streaming response from the model.""" |
|
generated_response = watsonx_llm.generate_text_stream(prompt=prompt_data, params=params) |
|
for chunk in generated_response: |
|
yield chunk |
|
|
|
def capture_tokens(prompt_data, response, client, bot_name): |
|
"""Capture token usage statistics.""" |
|
if not genparam.TOKEN_CAPTURE_ENABLED: |
|
return |
|
|
|
watsonx_llm = ModelInference( |
|
api_client=client, |
|
model_id=genparam.SELECTED_MODEL, |
|
verify=genparam.VERIFY |
|
) |
|
|
|
input_tokens = watsonx_llm.tokenize(prompt=prompt_data)["result"]["token_count"] |
|
output_tokens = watsonx_llm.tokenize(prompt=response)["result"]["token_count"] |
|
total_tokens = input_tokens + output_tokens |
|
|
|
return { |
|
"bot_name": bot_name, |
|
"input_tokens": input_tokens, |
|
"output_tokens": output_tokens, |
|
"total_tokens": total_tokens, |
|
"timestamp": time.strftime("%H:%M:%S") |
|
} |
|
|
|
def fetch_response(user_input, client, system_prompt, chat_history): |
|
"""Fetch response from the model for the given input.""" |
|
prompt = prepare_prompt(user_input, chat_history) |
|
prompt_data = apply_prompt_syntax( |
|
prompt, |
|
system_prompt, |
|
get_active_prompt_template(), |
|
genparam.BAKE_IN_PROMPT_SYNTAX |
|
) |
|
|
|
watsonx_llm = ModelInference( |
|
api_client=client, |
|
model_id=get_active_model(), |
|
verify=genparam.VERIFY |
|
) |
|
|
|
params = { |
|
GenParams.DECODING_METHOD: genparam.DECODING_METHOD, |
|
GenParams.MAX_NEW_TOKENS: genparam.MAX_NEW_TOKENS, |
|
GenParams.MIN_NEW_TOKENS: genparam.MIN_NEW_TOKENS, |
|
GenParams.REPETITION_PENALTY: genparam.REPETITION_PENALTY, |
|
GenParams.STOP_SEQUENCES: genparam.STOP_SEQUENCES |
|
} |
|
|
|
stream = generate_response(watsonx_llm, prompt_data, params) |
|
return stream, prompt_data |