iLearnHub-2 / server.py
broadfield-dev's picture
Update server.py
236ae84 verified
import gradio as gr
import pandas as pd
import threading
from datetime import datetime
import os
import json
import sqlite3
import time
from dotenv import load_dotenv
DEMO_MODE = os.getenv("DEMO_MODE", "False").lower() == 'true'
load_dotenv()
try:
from datasets import load_dataset, Dataset, DatasetDict, Features, Value
HF_DATASETS_AVAILABLE = True
except ImportError:
HF_DATASETS_AVAILABLE = False
Features, Value = None, None
STORAGE_BACKEND_CONFIG = os.getenv("STORAGE_BACKEND", "JSON").upper()
HF_DATASET_REPO = os.getenv("HF_DATASET_REPO")
HF_TOKEN = os.getenv("HF_TOKEN")
DB_FILE_JSON = "social_data_unified.json" # Changed filename to avoid conflicts
DB_FILE_SQLITE = "social_data_unified.db" # Changed filename
db_lock = threading.Lock()
HF_BACKUP_THRESHOLD = int(os.getenv("HF_BACKUP_THRESHOLD", 10))
dirty_operations_count = 0
# --- New Global Data Structure ---
users_db = {}
entries_df = pd.DataFrame()
post_id_counter = 0 # Single counter for all entries
# Define the schema for the unified entries table
ENTRY_SCHEMA = {
"post_id": "Int64", # Use nullable integer
"reply_to_id": "Int64", # Use nullable integer, None for top-level posts
"username": "object",
"content": "object",
"timestamp": "object",
"type": "object" # 'post' or 'comment'
}
def force_persist_data():
global dirty_operations_count
with db_lock:
storage_backend = STORAGE_BACKEND_CONFIG
print(f"Attempting to persist data to {storage_backend}")
if storage_backend == "RAM":
print("RAM backend. No persistence.")
return True, "RAM backend. No persistence."
elif storage_backend == "SQLITE":
try:
with sqlite3.connect(DB_FILE_SQLITE) as conn:
cursor = conn.cursor()
# Users table
cursor.execute("CREATE TABLE IF NOT EXISTS users (username TEXT PRIMARY KEY, password TEXT NOT NULL)")
# Entries table - new schema
cursor.execute("CREATE TABLE IF NOT EXISTS entries (post_id INTEGER PRIMARY KEY, reply_to_id INTEGER, username TEXT, content TEXT, timestamp TEXT, type TEXT)")
# Save users
users_to_save = [(u, p) for u, p in users_db.items()]
if users_to_save: # Avoid executing with empty list
conn.executemany("INSERT OR REPLACE INTO users (username, password) VALUES (?, ?)", users_to_save)
# Save entries (replace existing data)
# Ensure Int64 columns are correctly handled as nullable integers for SQL
entries_to_save = entries_df.copy()
entries_to_save['reply_to_id'] = entries_to_save['reply_to_id'].astype('object').where(entries_to_save['reply_to_id'].notna(), None)
entries_to_save.to_sql('entries', conn, if_exists='replace', index=False)
conn.commit()
print("Successfully saved to SQLite.")
return True, "Successfully saved to SQLite."
except Exception as e:
print(f"Error saving to SQLite: {e}")
return False, f"Error saving to SQLite: {e}"
elif storage_backend == "JSON":
try:
data_to_save = {
"users": users_db,
"entries": entries_df.to_dict('records')
}
with open(DB_FILE_JSON, "w") as f:
json.dump(data_to_save, f, indent=2)
print("Successfully saved to JSON file.")
return True, "Successfully saved to JSON file."
except Exception as e:
print(f"Error saving to JSON: {e}")
return False, f"Error saving to JSON: {e}"
elif storage_backend == "HF_DATASET":
if not all([HF_DATASETS_AVAILABLE, HF_TOKEN, HF_DATASET_REPO]):
print("HF_DATASET backend is not configured correctly.")
return False, "HF_DATASET backend is not configured correctly."
try:
print("Pushing data to Hugging Face Hub...")
# Convert nullable Int64 columns to standard int/float for dataset
entries_for_hf = entries_df.copy()
# Hugging Face datasets typically handle None/null correctly for integer types
# Ensure type hints are correct or handle potential type issues
entries_for_hf['post_id'] = entries_for_hf['post_id'].astype('int64') # Non-nullable ID
entries_for_hf['reply_to_id'] = entries_for_hf['reply_to_id'].astype('float64') # Use float for nullable integer in HF datasets
user_dataset = Dataset.from_pandas(pd.DataFrame(list(users_db.items()), columns=['username', 'password']))
entries_dataset = Dataset.from_pandas(entries_for_hf)
dataset_dict = DatasetDict({
'users': user_dataset,
'entries': entries_dataset,
})
# Define features explicitly for nullable types if needed, though pandas conversion often works
# user_features = Features({'username': Value('string'), 'password': Value('string')})
# entry_features = Features({'post_id': Value('int64'), 'reply_to_id': Value('int64'), 'username': Value('string'), 'content': Value('string'), 'timestamp': Value('string'), 'type': Value('string')})
# Pass features to from_pandas or push_to_hub if needed, but auto-detection is often sufficient for basic types
dataset_dict.push_to_hub(HF_DATASET_REPO, token=HF_TOKEN, private=True)
dirty_operations_count = 0
print(f"Successfully pushed data to {HF_DATASET_REPO}.")
return True, f"Successfully pushed data to {HF_DATASET_REPO}."
except Exception as e:
print(f"Error pushing to Hugging Face Hub: {e}")
return False, f"Error pushing to Hugging Face Hub: {e}"
print("Unknown backend.")
return False, "Unknown backend."
def handle_persistence_after_change():
global dirty_operations_count
storage_backend = STORAGE_BACKEND_CONFIG
if storage_backend in ["JSON", "SQLITE"]:
force_persist_data()
elif storage_backend == "HF_DATASET":
with db_lock:
dirty_operations_count += 1
if dirty_operations_count >= HF_BACKUP_THRESHOLD:
force_persist_data()
def load_data():
global STORAGE_BACKEND_CONFIG, users_db, entries_df, post_id_counter
storage_backend = STORAGE_BACKEND_CONFIG
with db_lock:
users = {"admin": "password"}
# Initialize entries DataFrame with the correct schema
entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()})
if storage_backend == "SQLITE":
try:
with sqlite3.connect(DB_FILE_SQLITE) as conn:
cursor = conn.cursor()
# Create tables if they don't exist
cursor.execute("CREATE TABLE IF NOT EXISTS users (username TEXT PRIMARY KEY, password TEXT NOT NULL)")
cursor.execute("CREATE TABLE IF NOT EXISTS entries (post_id INTEGER PRIMARY KEY, reply_to_id INTEGER, username TEXT, content TEXT, timestamp TEXT, type TEXT)")
# Add default admin user if not exists
cursor.execute("INSERT OR IGNORE INTO users (username, password) VALUES (?, ?)", ("admin", "password"))
conn.commit()
# Load data
users = dict(conn.execute("SELECT username, password FROM users").fetchall())
entries = pd.read_sql_query("SELECT * FROM entries", conn)
# Ensure correct dtypes, especially for nullable integers
for col, dtype in ENTRY_SCHEMA.items():
if col in entries.columns:
try:
entries[col] = entries[col].astype(dtype)
except Exception as e:
print(f"Warning: Could not convert column {col} to {dtype} from SQLite. {e}")
print(f"Successfully loaded data from SQLite: {DB_FILE_SQLITE}")
except Exception as e:
print(f"CRITICAL: Failed to use SQLite. Falling back to RAM. Error: {e}")
STORAGE_BACKEND_CONFIG = "RAM"
elif storage_backend == "JSON":
if os.path.exists(DB_FILE_JSON):
try:
with open(DB_FILE_JSON, "r") as f: data = json.load(f)
users = data.get("users", users)
loaded_entries_list = data.get("entries", [])
entries = pd.DataFrame(loaded_entries_list)
# Ensure correct dtypes after loading from JSON
if not entries.empty:
for col, dtype in ENTRY_SCHEMA.items():
if col in entries.columns:
try:
entries[col] = entries[col].astype(dtype)
except Exception as e:
print(f"Warning: Could not convert column {col} to {dtype} from JSON. {e}")
else:
# If JSON was empty or missing entries key, ensure empty DF has schema
entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()})
except (json.JSONDecodeError, KeyError, Exception) as e:
print(f"Error loading JSON data: {e}. Initializing with empty data.")
users = {"admin":"password"} # Reset users on load error? Or keep default? Let's keep default.
entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()})
elif storage_backend == "HF_DATASET":
if all([HF_DATASETS_AVAILABLE, HF_TOKEN, HF_DATASET_REPO]):
try:
print(f"Attempting to load from HF Dataset '{HF_DATASET_REPO}'...")
ds_dict = load_dataset(HF_DATASET_REPO, token=HF_TOKEN, trust_remote_code=True)
if ds_dict and 'users' in ds_dict and 'entries' in ds_dict:
# Load users
if ds_dict['users'].num_rows > 0:
users = dict(zip(ds_dict['users']['username'], ds_dict['users']['password']))
else:
users = {"admin":"password"} # Default admin if no users
# Load entries
entries = ds_dict['entries'].to_pandas()
# Ensure correct dtypes, especially for nullable integers
if not entries.empty:
for col, dtype in ENTRY_SCHEMA.items():
if col in entries.columns:
try:
# HF datasets might load Int64 as float or object, convert explicitly
if dtype == "Int64": # Pandas nullable integer
entries[col] = pd.to_numeric(entries[col], errors='coerce').astype(dtype)
else:
entries[col] = entries[col].astype(dtype)
except Exception as e:
print(f"Warning: Could not convert column {col} to {dtype} from HF Dataset. {e}")
else:
# If entries dataset is empty, ensure empty DF has schema
entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()})
print("Successfully loaded data from HF Dataset.")
else:
raise ValueError("Dataset dictionary is empty or malformed (missing 'users' or 'entries').")
except Exception as e:
print(f"Could not load from HF Dataset '{HF_DATASET_REPO}'. Attempting to initialize. Error: {e}")
try:
# Define features including nullable types if possible, or rely on pandas conversion
user_features = Features({'username': Value('string'), 'password': Value('string')})
# Use float64 for nullable int in HF Features as a common workaround
entry_features = Features({
'post_id': Value('int64'),
'reply_to_id': Value('float64'), # HF datasets often use float for nullable int
'username': Value('string'),
'content': Value('string'),
'timestamp': Value('string'),
'type': Value('string')
})
initial_users_df = pd.DataFrame(list(users.items()), columns=['username', 'password'])
# Ensure initial empty entries DF conforms to the HF features expected types
initial_entries_df = pd.DataFrame({k: pd.Series(dtype='float64' if k in ['post_id', 'reply_to_id'] else 'object') for k in ENTRY_SCHEMA.keys()})
dataset_dict = DatasetDict({
'users': Dataset.from_pandas(initial_users_df, features=user_features),
'entries': Dataset.from_pandas(initial_entries_df, features=entry_features) # Use initial empty with HF types
})
dataset_dict.push_to_hub(HF_DATASET_REPO, token=HF_TOKEN, private=True)
print(f"Successfully initialized new empty HF Dataset at {HF_DATASET_REPO}.")
# After initializing, reset entries_df to pandas schema
entries = pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()})
except Exception as e_push:
print(f"CRITICAL: Failed to create new HF Dataset. Falling back to RAM. Push Error: {e_push}")
STORAGE_BACKEND_CONFIG = "RAM"
else:
print("HF_DATASET backend not fully configured. Falling back to RAM.")
STORAGE_BACKEND_CONFIG = "RAM"
else: # RAM backend or fallback
print("Using RAM backend.")
# Initialize global variables after loading/initializing
users_db = users
entries_df = entries
# Calculate the next post_id counter value
post_id_counter = int(entries_df['post_id'].max()) if not entries_df.empty and entries_df['post_id'].notna().any() else 0
print(f"Loaded data. Users: {len(users_db)}, Entries: {len(entries_df)}. Next Post ID: {post_id_counter + 1}")
# --- Load Data Initially ---
load_data()
# --- API Functions (adapted for unified structure) ---
def api_register(username, password):
if not username or not password: return "Failed: Username/password cannot be empty."
with db_lock:
if username in users_db: return f"Failed: Username '{username}' already exists."
users_db[username] = password
handle_persistence_after_change()
return f"Success: User '{username}' registered."
def api_login(username, password):
# Simulate authentication token (basic user:pass string)
# In a real app, use proper token/session management
return f"{username}:{password}" if users_db.get(username) == password else "Failed: Invalid credentials."
def _get_user_from_token(token):
if not token or ':' not in token: return None
user, pwd = token.split(':', 1)
with db_lock: # Access users_db requires lock
return user if users_db.get(user) == pwd else None
def api_create_post(auth_token, content):
"""Creates a top-level post entry."""
global entries_df, post_id_counter
username = _get_user_from_token(auth_token)
if not username: return "Failed: Invalid auth token."
if not content: return "Failed: Content cannot be empty."
with db_lock:
post_id_counter += 1
new_entry = pd.DataFrame([{
"post_id": post_id_counter,
"reply_to_id": pd.NA, # Use pandas NA for nullable integer
"username": username,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
"type": "post"
}]).astype(ENTRY_SCHEMA) # Ensure correct dtypes
entries_df = pd.concat([entries_df, new_entry], ignore_index=True)
handle_persistence_after_change()
return f"Success: Post {post_id_counter} created."
def api_create_comment(auth_token, reply_to_id, content):
"""Creates a comment/reply entry."""
global entries_df, post_id_counter
username = _get_user_from_token(auth_token)
if not username: return "Failed: Invalid auth token."
if not content: return "Failed: Content cannot be empty."
if reply_to_id is None: return "Failed: Reply to ID cannot be empty for a comment/reply."
try:
reply_to_id = int(reply_to_id) # Ensure it's an integer
except (ValueError, TypeError):
return "Failed: Invalid Reply To ID."
with db_lock:
# Check if the entry being replied to exists
if reply_to_id not in entries_df['post_id'].values:
return f"Failed: Entry with ID {reply_to_id} not found."
post_id_counter += 1
new_entry = pd.DataFrame([{
"post_id": post_id_counter,
"reply_to_id": reply_to_id,
"username": username,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
"type": "comment" # All replies are 'comment' type in this scheme
}]).astype(ENTRY_SCHEMA) # Ensure correct dtypes
entries_df = pd.concat([entries_df, new_entry], ignore_index=True)
handle_persistence_after_change()
return f"Success: Comment/Reply {post_id_counter} created (replying to {reply_to_id})."
def api_get_feed():
"""Retrieves all entries sorted by timestamp."""
with db_lock:
# Return a copy to prevent external modifications
feed_data = entries_df.copy()
if feed_data.empty:
# Return empty DataFrame with expected columns
return pd.DataFrame({k: pd.Series(dtype=v) for k, v in ENTRY_SCHEMA.items()})
# Ensure timestamp is datetime for sorting, handle potential errors
try:
feed_data['timestamp'] = pd.to_datetime(feed_data['timestamp'])
except Exception as e:
print(f"Warning: Could not convert timestamp column to datetime: {e}")
# If conversion fails, sort by post_id or keep unsorted as fallback
# Let's skip sorting by timestamp if conversion fails
pass
# Sort (prefer timestamp, fallback to post_id if timestamp fails or is identical)
if 'timestamp' in feed_data.columns and pd.api.types.is_datetime64_any_dtype(feed_data['timestamp']):
feed_data = feed_data.sort_values(by=['timestamp', 'post_id'], ascending=[False, False])
else:
feed_data = feed_data.sort_values(by='post_id', ascending=False)
# Select and rename/reorder columns for display if necessary
# The current schema matches well, just need to ensure all columns are present
display_columns = list(ENTRY_SCHEMA.keys()) # Use all columns in the schema
feed_data = feed_data.reindex(columns=display_columns)
# Fill NaN/NA for display purposes (optional, but can make table cleaner)
# Convert nullable Int64 NA to empty string or specific placeholder for display
for col in ['post_id', 'reply_to_id']:
if col in feed_data.columns:
feed_data[col] = feed_data[col].apply(lambda x: '' if pd.isna(x) else int(x)) # Display int without .0
return feed_data
# --- UI Functions (adapted for unified structure) ---
def ui_manual_post(username, password, content):
auth_token = api_login(username, password)
if "Failed" in auth_token: return "Login failed.", api_get_feed()
return api_create_post(auth_token, content), api_get_feed()
def ui_manual_comment(username, password, reply_to_id, content):
auth_token = api_login(username, password)
if "Failed" in auth_token: return "Login failed.", api_get_feed()
return api_create_comment(auth_token, reply_to_id, content), api_get_feed()
def ui_save_to_json():
# Call the general persistence function targeting JSON
success, message = force_persist_data()
# Modify message to indicate JSON specifically if needed, or keep general
if "Successfully saved to JSON file." in message:
return f"Successfully saved current state to {DB_FILE_JSON}."
else:
return message # Return the error message from persistence
# --- Gradio UI ---
with gr.Blocks(theme=gr.themes.Soft(), title="Social App") as demo:
gr.Markdown("# Social Media Server for iLearn Agent")
gr.Markdown(f"This app provides an API for iLearn agents to interact with. **Storage Backend: `{STORAGE_BACKEND_CONFIG}`**")
with gr.Tabs():
with gr.TabItem("Live Feed"):
# Define DataFrame columns based on the new schema
feed_columns = [(col, "number" if "id" in col else "text") for col in ENTRY_SCHEMA.keys()]
feed_df_display = gr.DataFrame(label="Feed", interactive=False, wrap=True, headers=list(ENTRY_SCHEMA.keys()))
refresh_btn = gr.Button("Refresh Feed")
with gr.TabItem("Manual Actions"):
manual_action_status = gr.Textbox(label="Action Status", interactive=False)
with gr.Row():
with gr.Group():
gr.Markdown("### Create Post")
post_user = gr.Textbox(label="User", value="admin")
post_pass = gr.Textbox(label="Pass", type="password", value="password")
post_content = gr.Textbox(label="Content", lines=3)
post_button = gr.Button("Submit Post", variant="primary")
with gr.Group():
gr.Markdown("### Create Comment / Reply")
comment_user = gr.Textbox(label="User", value="admin")
comment_pass = gr.Textbox(label="Pass", type="password", value="password")
# Updated UI field for the single Reply To ID
comment_reply_to_id = gr.Number(label="Reply To Entry ID (Post or Comment ID)", precision=0) # precision=0 for integer input
comment_content = gr.Textbox(label="Content", lines=2)
comment_button = gr.Button("Submit Comment", variant="primary")
with gr.Group():
gr.Markdown("### Data Management")
save_json_button = gr.Button("Save Current State to JSON") # Button label kept simple, func calls general persistence
# --- UI Actions ---
# Post button now calls ui_manual_post which calls api_create_post
post_button.click(ui_manual_post, [post_user, post_pass, post_content], [manual_action_status, feed_df_display])
# Comment button calls ui_manual_comment with the single reply_to_id field
comment_button.click(ui_manual_comment, [comment_user, comment_pass, comment_reply_to_id, comment_content], [manual_action_status, feed_df_display])
save_json_button.click(ui_save_to_json, None, [manual_action_status])
refresh_btn.click(api_get_feed, None, feed_df_display)
# Load feed on startup
demo.load(api_get_feed, None, feed_df_display)
# --- Gradio API Endpoints (adapted for unified structure) ---
# Ensure API names match the expected iLearn agent interactions
with gr.Column(visible=False): # Hide API interfaces in the main UI
gr.Interface(api_register, ["text", "text"], "text", api_name="register")
gr.Interface(api_login, ["text", "text"], "text", api_name="login")
# api_create_post: token, content
gr.Interface(api_create_post, ["text", "text"], "text", api_name="create_post")
# api_create_comment: token, reply_to_id, content
# Note: Gradio interface infers types; Number will be float unless precision=0 and converted
gr.Interface(api_create_comment, ["text", "number", "text"], "text", api_name="create_comment")
# api_get_feed: no input, returns dataframe
gr.Interface(api_get_feed, None, "dataframe", api_name="get_feed")
if __name__ == "__main__":
# Ensure initial persistence happens on first run if not loading data
if not os.path.exists(DB_FILE_JSON) and not os.path.exists(DB_FILE_SQLITE) and STORAGE_BACKEND_CONFIG != "HF_DATASET":
print("No existing data files found. Performing initial save.")
force_persist_data() # Persist the initial admin user and empty tables
demo.queue().launch(server_name="0.0.0.0", server_port=7860, share=False)