Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
import logging | |
from typing import List, Dict | |
import gc | |
import os | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Set environment variables for memory optimization | |
os.environ['TRANSFORMERS_CACHE'] = '/home/user/.cache/huggingface/hub' | |
os.environ['TOKENIZERS_PARALLELISM'] = 'false' | |
class HealthAssistant: | |
def __init__(self): | |
self.model_id = "microsoft/Phi-2" # Using smaller Phi-2 model | |
self.model = None | |
self.tokenizer = None | |
self.pipe = None | |
self.metrics = [] | |
self.medications = [] | |
self.device = "cpu" | |
self.is_model_loaded = False | |
self.max_history_length = 2 | |
def initialize_model(self): | |
try: | |
if self.is_model_loaded: | |
return True | |
logger.info(f"Loading model: {self.model_id}") | |
self.tokenizer = AutoTokenizer.from_pretrained( | |
self.model_id, | |
trust_remote_code=True, | |
model_max_length=256, | |
padding_side="left" | |
) | |
logger.info("Tokenizer loaded") | |
self.model = AutoModelForCausalLM.from_pretrained( | |
self.model_id, | |
torch_dtype=torch.float32, | |
trust_remote_code=True, | |
device_map=None, | |
low_cpu_mem_usage=True | |
).to(self.device) | |
gc.collect() | |
self.pipe = pipeline( | |
"text-generation", | |
model=self.model, | |
tokenizer=self.tokenizer, | |
device=self.device, | |
model_kwargs={"low_cpu_mem_usage": True} | |
) | |
self.is_model_loaded = True | |
logger.info("Model initialized successfully") | |
return True | |
except Exception as e: | |
logger.error(f"Error in model initialization: {str(e)}") | |
raise | |
def unload_model(self): | |
if hasattr(self, 'model') and self.model is not None: | |
del self.model | |
self.model = None | |
if hasattr(self, 'pipe') and self.pipe is not None: | |
del self.pipe | |
self.pipe = None | |
if hasattr(self, 'tokenizer') and self.tokenizer is not None: | |
del self.tokenizer | |
self.tokenizer = None | |
self.is_model_loaded = False | |
gc.collect() | |
logger.info("Model unloaded successfully") | |
def generate_response(self, message: str, history: List = None) -> str: | |
try: | |
if not self.is_model_loaded: | |
self.initialize_model() | |
message = message[:200] # Truncate long messages | |
prompt = self._prepare_prompt(message, history[-self.max_history_length:] if history else None) | |
generation_args = { | |
"max_new_tokens": 200, | |
"return_full_text": False, | |
"temperature": 0.7, | |
"do_sample": True, | |
"top_k": 50, | |
"top_p": 0.9, | |
"repetition_penalty": 1.1, | |
"num_return_sequences": 1, | |
"batch_size": 1 | |
} | |
output = self.pipe(prompt, **generation_args) | |
response = output[0]['generated_text'] | |
gc.collect() | |
return response.strip() | |
except Exception as e: | |
logger.error(f"Error generating response: {str(e)}") | |
return "I apologize, but I encountered an error. Please try again." | |
def _prepare_prompt(self, message: str, history: List = None) -> str: | |
prompt_parts = [ | |
"Medical AI assistant. Be professional, include disclaimers.", | |
self._get_health_context() | |
] | |
if history: | |
for h in history: | |
if isinstance(h, dict): # New message format | |
if h['role'] == 'user': | |
prompt_parts.append(f"Human: {h['content'][:100]}") | |
else: | |
prompt_parts.append(f"Assistant: {h['content'][:100]}") | |
else: # Old format (tuple) | |
prompt_parts.extend([ | |
f"Human: {h[0][:100]}", | |
f"Assistant: {h[1][:100]}" | |
]) | |
prompt_parts.extend([ | |
f"Human: {message}", | |
"Assistant:" | |
]) | |
return "\n".join(prompt_parts) | |
def _get_health_context(self) -> str: | |
if not self.metrics and not self.medications: | |
return "No health data" | |
context = [] | |
if self.metrics: | |
latest = self.metrics[-1] | |
context.append(f"Metrics: W:{latest['Weight']}kg S:{latest['Steps']} Sl:{latest['Sleep']}h") | |
if self.medications: | |
meds = [f"{m['Medication']}({m['Dosage']}@{m['Time']})" for m in self.medications[-2:]] | |
context.append("Meds: " + ", ".join(meds)) | |
return " | ".join(context) | |
def add_metrics(self, weight: float, steps: int, sleep: float) -> bool: | |
try: | |
if len(self.metrics) >= 5: | |
self.metrics.pop(0) | |
self.metrics.append({ | |
'Weight': weight, | |
'Steps': steps, | |
'Sleep': sleep | |
}) | |
return True | |
except Exception as e: | |
logger.error(f"Error adding metrics: {e}") | |
return False | |
def add_medication(self, name: str, dosage: str, time: str, notes: str = "") -> bool: | |
try: | |
if len(self.medications) >= 5: | |
self.medications.pop(0) | |
self.medications.append({ | |
'Medication': name, | |
'Dosage': dosage, | |
'Time': time, | |
'Notes': notes | |
}) | |
return True | |
except Exception as e: | |
logger.error(f"Error adding medication: {e}") | |
return False | |
class GradioInterface: | |
def __init__(self): | |
try: | |
logger.info("Initializing Health Assistant...") | |
self.assistant = HealthAssistant() | |
logger.info("Health Assistant initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize Health Assistant: {e}") | |
raise | |
def chat_response(self, message: str, history: List) -> tuple: | |
if not message.strip(): | |
return "", history | |
try: | |
response = self.assistant.generate_response(message, history) | |
# Convert to new message format | |
history.append({"role": "user", "content": message}) | |
history.append({"role": "assistant", "content": response}) | |
if len(history) % 3 == 0: | |
self.assistant.unload_model() | |
return "", history | |
except Exception as e: | |
logger.error(f"Error in chat response: {e}") | |
return "", history + [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": "I apologize, but I encountered an error. Please try again."} | |
] | |
def add_health_metrics(self, weight: float, steps: int, sleep: float) -> str: | |
if not all([weight is not None, steps is not None, sleep is not None]): | |
return "⚠️ Please fill in all metrics." | |
if weight <= 0 or steps < 0 or sleep < 0: | |
return "⚠️ Please enter valid positive numbers." | |
if self.assistant.add_metrics(weight, steps, sleep): | |
return f"""✅ Health metrics saved successfully! | |
• Weight: {weight} kg | |
• Steps: {steps} | |
• Sleep: {sleep} hours""" | |
return "❌ Error saving metrics." | |
def add_medication_info(self, name: str, dosage: str, time: str, notes: str) -> str: | |
if not all([name, dosage, time]): | |
return "⚠️ Please fill in all required fields." | |
if self.assistant.add_medication(name, dosage, time, notes): | |
return f"""✅ Medication added successfully! | |
• Medication: {name} | |
• Dosage: {dosage} | |
• Time: {time} | |
• Notes: {notes if notes else 'None'}""" | |
return "❌ Error adding medication." | |
def create_interface(self): | |
with gr.Blocks(title="Medical Health Assistant") as demo: | |
gr.Markdown(""" | |
# 🏥 Medical Health Assistant | |
This AI assistant provides general health information and guidance. | |
""") | |
with gr.Tabs(): | |
with gr.Tab("💬 Medical Consultation"): | |
chatbot = gr.Chatbot( | |
value=[], | |
height=400, | |
label=False, | |
type="messages" # Using new message format | |
) | |
with gr.Row(): | |
msg = gr.Textbox( | |
placeholder="Ask your health question...", | |
lines=1, | |
label=False, | |
scale=9 | |
) | |
send_btn = gr.Button("Send", scale=1) | |
clear_btn = gr.Button("Clear Chat") | |
with gr.Tab("📊 Health Metrics"): | |
gr.Markdown("### Track Your Health Metrics") | |
with gr.Row(): | |
weight_input = gr.Number( | |
label="Weight (kg)", | |
minimum=0, | |
maximum=500 | |
) | |
steps_input = gr.Number( | |
label="Steps", | |
minimum=0, | |
maximum=100000 | |
) | |
sleep_input = gr.Number( | |
label="Hours Slept", | |
minimum=0, | |
maximum=24 | |
) | |
metrics_btn = gr.Button("Save Metrics") | |
metrics_status = gr.Markdown() | |
with gr.Tab("💊 Medication Manager"): | |
gr.Markdown("### Track Your Medications") | |
med_name = gr.Textbox( | |
label="Medication Name", | |
placeholder="Enter medication name" | |
) | |
with gr.Row(): | |
med_dosage = gr.Textbox( | |
label="Dosage", | |
placeholder="e.g., 500mg" | |
) | |
med_time = gr.Textbox( | |
label="Time", | |
placeholder="e.g., 9:00 AM" | |
) | |
med_notes = gr.Textbox( | |
label="Notes (optional)", | |
placeholder="Additional instructions or notes" | |
) | |
med_btn = gr.Button("Add Medication") | |
med_status = gr.Markdown() | |
msg.submit(self.chat_response, [msg, chatbot], [msg, chatbot]) | |
send_btn.click(self.chat_response, [msg, chatbot], [msg, chatbot]) | |
clear_btn.click(lambda: [], None, chatbot) | |
metrics_btn.click( | |
self.add_health_metrics, | |
inputs=[weight_input, steps_input, sleep_input], | |
outputs=[metrics_status] | |
) | |
med_btn.click( | |
self.add_medication_info, | |
inputs=[med_name, med_dosage, med_time, med_notes], | |
outputs=[med_status] | |
) | |
gr.Markdown(""" | |
### ⚠️ Medical Disclaimer | |
This AI assistant provides general health information only. Not a replacement for professional medical advice. | |
Always consult healthcare professionals for medical decisions. | |
""") | |
demo.queue(max_size=5) | |
return demo | |
from pathlib import Path | |
import io | |
import json | |
import math | |
import statistics | |
import sys | |
import time | |
from datasets import concatenate_datasets, Dataset | |
from datasets import load_dataset | |
from huggingface_hub import hf_hub_url | |
import pandas as pd | |
import numpy as np | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer | |
from evaluate import load | |
# 1. record each file name included | |
# 1.1 read different file formats depending on parameters (i.e., filetype) | |
# 2. determine column types and report how many rows for each type (format check) | |
# (in a well-formatted dataset, each column should only have one type) | |
# 3. report on the null values | |
# 4. for certain column types, report statistics | |
# 4.1 uniqueness: if all rows are of a small number of <string> values, treat the column as 'categorical' < 10. | |
# 4.2 strings: length ranges | |
# 4.3 lists: length ranges | |
# 4.3 int/float/double: their percentiles, min, max, mean | |
CELL_TYPES_LENGTH = ["<class 'str'>", "<class 'list'>"] | |
CELL_TYPES_NUMERIC = ["<class 'int'>", "<class 'float'>"] | |
PERCENTILES = [1, 5, 10, 25, 50, 100, 250, 500, 750, 900, 950, 975, 990, 995, 999] | |
def read_data(all_files, filetype): | |
df = None | |
func_name = "" | |
if filetype in ["parquet", "csv", "json"]: | |
if filetype == "parquet": | |
func_name = pd.read_parquet | |
elif filetype == "csv": | |
func_name = pd.read_csv | |
elif filetype == "json": | |
func_name = pd.read_json | |
df = pd.concat(func_name(f) for f in all_files) | |
elif filetype == "arrow": | |
ds = concatenate_datasets([Dataset.from_file(str(fname)) for fname in all_files]) | |
df = pd.DataFrame(data=ds) | |
elif filetype == "jsonl": | |
func_name = pd.read_json | |
all_lines = [] | |
for fname in all_files: | |
with open(fname, "r") as f: | |
all_lines.extend(f.readlines()) | |
df = pd.concat([pd.DataFrame.from_dict([json.loads(line)]) for line in all_lines]) | |
return df | |
def compute_cell_length_ranges(cell_lengths, cell_unique_string_values): | |
cell_length_ranges = {} | |
cell_length_ranges = {} | |
string_categorical = {} | |
# this is probably a 'categorical' (i.e., 'classes' in HuggingFace) value | |
# with few unique items (need to check that while reading the cell), | |
# so no need to treat it as a normal string | |
if len(cell_unique_string_values) > 0 and len(cell_unique_string_values) <= 10: | |
string_categorical = str(len(cell_unique_string_values)) + " class(es)" | |
elif cell_lengths: | |
cell_lengths = sorted(cell_lengths) | |
min_val = cell_lengths[0] | |
max_val = cell_lengths[-1] | |
distance = math.ceil((max_val - min_val) / 10.0) | |
ranges = [] | |
if min_val != max_val: | |
for j in range(min_val, max_val, distance): | |
ranges.append(j) | |
for j in range(len(ranges)-1): | |
cell_length_ranges[str(ranges[j]) + "-" + str(ranges[j+1])] = 0 | |
ranges.append(max_val) | |
j = 1 | |
c = 0 | |
for k in cell_lengths: | |
if j == len(ranges): | |
c += 1 | |
elif k < ranges[j]: | |
c += 1 | |
else: | |
cell_length_ranges[str(ranges[j-1]) + "-" + str(ranges[j])] = c | |
j += 1 | |
c = 1 | |
cell_length_ranges[str(ranges[j-1]) + "-" + str(max_val)] = c | |
else: | |
ranges = [min_val] | |
c = 0 | |
for k in cell_lengths: | |
c += 1 | |
cell_length_ranges[str(min_val)] = c | |
return cell_length_ranges, string_categorical | |
def _compute_percentiles(values, percentiles=PERCENTILES): | |
result = {} | |
quantiles = statistics.quantiles(values, n=max(PERCENTILES)+1, method='inclusive') | |
for p in percentiles: | |
result[p/10] = quantiles[p-1] | |
return result | |
def compute_cell_value_statistics(cell_values): | |
stats = {} | |
if cell_values: | |
cell_values = sorted(cell_values) | |
stats["min"] = cell_values[0] | |
stats["max"] = cell_values[-1] | |
stats["mean"] = statistics.mean(cell_values) | |
stats["stdev"] = statistics.stdev(cell_values) | |
stats["variance"] = statistics.variance(cell_values) | |
stats["percentiles"] = _compute_percentiles(cell_values) | |
return stats | |
def check_null(cell, cell_type): | |
if cell_type == "<class 'float'>": | |
if math.isnan(cell): | |
return True | |
elif cell is None: | |
return True | |
return False | |
def compute_property(data_path, glob, filetype): | |
output = {} | |
data_dir = Path(data_path) | |
filenames = [] | |
all_files = list(data_dir.glob(glob)) | |
for f in all_files: | |
print(str(f)) | |
base_fname = str(f)[len(str(data_path)):] | |
if not data_path.endswith("/"): | |
base_fname = base_fname[1:] | |
filenames.append(base_fname) | |
output["filenames"] = filenames | |
df = read_data(all_files, filetype) | |
column_info = {} | |
for col_name in df.columns: | |
if col_name not in column_info: | |
column_info[col_name] = {} | |
cell_types = {} | |
cell_lengths = {} | |
cell_unique_string_values = {} | |
cell_values = {} | |
null_count = 0 | |
col_values = df[col_name].to_list() | |
for cell in col_values: | |
# for index, row in df.iterrows(): | |
# cell = row[col_name] | |
cell_type = str(type(cell)) | |
cell_type = str(type(cell)) | |
# print(cell, cell_type) | |
if check_null(cell, cell_type): | |
null_count += 1 | |
continue | |
if cell_type not in cell_types: | |
cell_types[cell_type] = 1 | |
else: | |
cell_types[cell_type] += 1 | |
if cell_type in CELL_TYPES_LENGTH: | |
cell_length = len(cell) | |
if cell_type not in cell_lengths: | |
cell_lengths[cell_type] = [] | |
cell_lengths[cell_type].append(cell_length) | |
if cell_type == "<class 'str'>" and cell not in cell_unique_string_values: | |
cell_unique_string_values[cell] = True | |
elif cell_type in CELL_TYPES_NUMERIC: | |
if cell_type not in cell_values: | |
cell_values[cell_type] = [] | |
cell_values[cell_type].append(cell) | |
else: | |
print(cell_type) | |
clrs = {} | |
ccs = {} | |
for cell_type in CELL_TYPES_LENGTH: | |
if cell_type in cell_lengths: | |
clr, cc = compute_cell_length_ranges(cell_lengths[cell_type], cell_unique_string_values) | |
clrs[cell_type] = clr | |
ccs[cell_type] = cc | |
css = {} | |
for cell_type in CELL_TYPES_NUMERIC: | |
if cell_type in cell_values: | |
cell_stats = compute_cell_value_statistics(cell_values[cell_type]) | |
css[cell_type] = cell_stats | |
column_info[col_name]["cell_types"] = cell_types | |
column_info[col_name]["cell_length_ranges"] = clrs | |
column_info[col_name]["cell_categories"] = ccs | |
column_info[col_name]["cell_stats"] = css | |
column_info[col_name]["cell_missing"] = null_count | |
output["column_info"] = column_info | |
output["number_of_items"] = len(df) | |
output["timestamp"] = time.time() | |
return output | |
def preprocess_function(examples): | |
return tokenizer(examples["sentence1"], examples["sentence2"], truncation=True) | |
def compute_metrics(eval_pred): | |
predictions, labels = eval_pred | |
predictions = np.argmax(predictions, axis=1) | |
return metric.compute(predictions=predictions, references=labels) | |
def compute_model_card_evaluation_results(tokenizer, model_checkpoint, raw_datasets, metric): | |
tokenized_datasets = raw_datasets.map(preprocess_function, batched=True) | |
model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=2) | |
batch_size = 16 | |
args = TrainingArguments( | |
"test-glue", | |
evaluation_strategy = "epoch", | |
learning_rate=5e-5, | |
seed=42, | |
lr_scheduler_type="linear", | |
per_device_train_batch_size=batch_size, | |
per_device_eval_batch_size=batch_size, | |
num_train_epochs=3, | |
weight_decay=0.01, | |
load_best_model_at_end=False, | |
metric_for_best_model="accuracy", | |
report_to="none" | |
) | |
trainer = Trainer( | |
model, | |
args, | |
train_dataset=tokenized_datasets["train"], | |
eval_dataset=tokenized_datasets["validation"], | |
tokenizer=tokenizer, | |
compute_metrics=compute_metrics | |
) | |
result = trainer.evaluate() | |
return result | |
def main(): | |
try: | |
interface = GradioInterface() | |
demo = interface.create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
show_error=True, | |
share=True | |
) | |
except Exception as e: | |
logger.error(f"Error starting application: {e}") | |
raise | |
if __name__ == "__main__": | |
main() |