|
import gradio as gr |
|
from neo4j import GraphDatabase |
|
import logging |
|
from typing import List, Dict, Tuple |
|
import pandas as pd |
|
from datetime import datetime |
|
import os |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
NEO4J_URL = os.getenv("NEO4J_URL") |
|
NEO4J_USER = os.getenv("NEO4J_USER") |
|
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") |
|
|
|
def format_neo4j_datetime(dt) -> str: |
|
"""Convert Neo4j datetime to string format.""" |
|
if dt is None: |
|
logger.info("Received None datetime") |
|
return 'Unknown date' |
|
try: |
|
logger.info(f"Formatting datetime: {dt} of type {type(dt)}") |
|
if hasattr(dt, 'to_native'): |
|
dt = dt.to_native() |
|
logger.info(f"Converted to native: {dt} of type {type(dt)}") |
|
return dt.strftime('%Y-%m-%d') |
|
except Exception as e: |
|
logger.warning(f"Error formatting datetime: {e}") |
|
return 'Unknown date' |
|
|
|
def is_displayable_keyword(keyword: str) -> bool: |
|
""" |
|
Check if a keyword should be displayed (not just numbers and separators). |
|
Filters out: |
|
- Pure numbers (1234) |
|
- Numbers with dots (15.10) |
|
- Numbers with dashes (2023-01) |
|
- Numbers with spaces (15 10) |
|
- Numbers with slashes (15/10) |
|
- Any combination of above |
|
""" |
|
if not keyword: |
|
return False |
|
|
|
|
|
cleaned = keyword.replace('.', '') \ |
|
.replace('-', '') \ |
|
.replace('/', '') \ |
|
.replace('\\', '') \ |
|
.replace(' ', '') \ |
|
.replace(':', '') \ |
|
.replace(',', '') |
|
|
|
|
|
return not cleaned.isdigit() |
|
|
|
def format_interest_list_for_display(interests: set, max_items: int = 10) -> str: |
|
"""Format a list of interests for display, hiding numeric-only keywords.""" |
|
if not interests: |
|
return 'None' |
|
|
|
displayable_interests = {interest for interest in interests if is_displayable_keyword(interest)} |
|
if not displayable_interests: |
|
return 'None' |
|
sorted_interests = sorted(displayable_interests) |
|
if len(sorted_interests) <= max_items: |
|
return ', '.join(sorted_interests) |
|
return f"{', '.join(sorted_interests[:max_items])} (+{len(sorted_interests) - max_items} more)" |
|
|
|
class QuestionRecommender: |
|
def __init__(self): |
|
try: |
|
self.driver = GraphDatabase.driver( |
|
NEO4J_URL, |
|
auth=(NEO4J_USER, NEO4J_PASSWORD) |
|
) |
|
logger.info("Initializing QuestionRecommender with debug database") |
|
|
|
self.driver.verify_connectivity() |
|
logger.info("Successfully connected to Neo4j database") |
|
self.verify_connection() |
|
|
|
self.inspect_question_types() |
|
except Exception as e: |
|
logger.error(f"Failed to initialize database connection: {str(e)}") |
|
raise |
|
|
|
def verify_connection(self): |
|
"""Verify database connection and log basic statistics.""" |
|
try: |
|
with self.driver.session() as session: |
|
|
|
test_result = session.run("MATCH (n) RETURN count(n) as count").single() |
|
if not test_result: |
|
raise Exception("Could not execute test query") |
|
logger.info(f"Database contains {test_result['count']} total nodes") |
|
|
|
|
|
stats = session.run(""" |
|
// Count nodes |
|
MATCH (u:User) |
|
WITH COUNT(u) as user_count |
|
MATCH (k:Keyword) |
|
WITH user_count, COUNT(k) as keyword_count |
|
MATCH (q:Question) |
|
WITH user_count, keyword_count, COUNT(q) as question_count |
|
MATCH (t:Topic) |
|
WITH user_count, keyword_count, question_count, COUNT(t) as topic_count |
|
|
|
// Count relationships |
|
OPTIONAL MATCH ()-[r:INTERESTED_IN_KEYWORD]->() |
|
WITH user_count, keyword_count, question_count, topic_count, COUNT(r) as keyword_rel_count |
|
OPTIONAL MATCH ()-[r:INTERESTED_IN_TOPIC]->() |
|
WITH user_count, keyword_count, question_count, topic_count, keyword_rel_count, COUNT(r) as topic_rel_count |
|
OPTIONAL MATCH ()-[r:HAS_KEYWORD]->() |
|
WITH user_count, keyword_count, question_count, topic_count, keyword_rel_count, topic_rel_count, COUNT(r) as question_keyword_count |
|
OPTIONAL MATCH ()-[r:HAS_TOPIC]->() |
|
RETURN |
|
user_count, keyword_count, question_count, topic_count, |
|
keyword_rel_count, topic_rel_count, |
|
question_keyword_count, COUNT(r) as question_topic_count |
|
""").single() |
|
|
|
if not stats: |
|
raise Exception("Could not retrieve database statistics") |
|
|
|
logger.info("=== Database Statistics ===") |
|
logger.info(f"Nodes:") |
|
logger.info(f" Users: {stats['user_count']}") |
|
logger.info(f" Keywords: {stats['keyword_count']}") |
|
logger.info(f" Questions: {stats['question_count']}") |
|
logger.info(f" Topics: {stats['topic_count']}") |
|
logger.info(f"\nRelationships:") |
|
logger.info(f" User->Keyword (INTERESTED_IN_KEYWORD): {stats['keyword_rel_count']}") |
|
logger.info(f" User->Topic (INTERESTED_IN_TOPIC): {stats['topic_rel_count']}") |
|
logger.info(f" Question->Keyword (HAS_KEYWORD): {stats['question_keyword_count']}") |
|
logger.info(f" Question->Topic (HAS_TOPIC): {stats['question_topic_count']}") |
|
|
|
except Exception as e: |
|
logger.error(f"Database verification failed: {str(e)}") |
|
logger.error(f"URL: {NEO4J_URL}") |
|
logger.error(f"User: {NEO4J_USER}") |
|
raise Exception(f"Failed to verify database connection: {str(e)}") |
|
|
|
def inspect_question_types(self): |
|
"""Inspect different types of questions and their attributes in the database.""" |
|
with self.driver.session() as session: |
|
try: |
|
|
|
result = session.run(""" |
|
MATCH (q:Question) |
|
WITH DISTINCT keys(q) as props, labels(q) as types |
|
RETURN types, props, count(*) as count |
|
ORDER BY count DESC |
|
""") |
|
|
|
logger.info("\n=== Question Types and Properties ===") |
|
for record in result: |
|
types = record["types"] |
|
props = record["props"] |
|
count = record["count"] |
|
logger.info(f"\nType: {types}") |
|
logger.info(f"Count: {count}") |
|
logger.info("Properties:") |
|
for prop in props: |
|
|
|
sample = session.run(""" |
|
MATCH (q:Question) |
|
WHERE $prop in keys(q) |
|
RETURN q[$prop] as value |
|
LIMIT 1 |
|
""", prop=prop).single() |
|
|
|
value = sample["value"] if sample else None |
|
value_type = type(value).__name__ if value is not None else "None" |
|
logger.info(f" - {prop}: {value_type} (example: {str(value)[:100]}{'...' if str(value)[100:] else ''})") |
|
|
|
|
|
result = session.run(""" |
|
MATCH (q:Question)-[r]->(target) |
|
WITH DISTINCT type(r) as rel_type, labels(target) as target_labels, count(*) as count |
|
RETURN rel_type, target_labels, count |
|
ORDER BY count DESC |
|
""") |
|
|
|
logger.info("\n=== Question Relationships ===") |
|
for record in result: |
|
rel_type = record["rel_type"] |
|
target_labels = record["target_labels"] |
|
count = record["count"] |
|
logger.info(f"Relationship: {rel_type} -> {target_labels} (Count: {count})") |
|
|
|
except Exception as e: |
|
logger.error(f"Error inspecting question types: {str(e)}") |
|
raise |
|
|
|
def close(self): |
|
self.driver.close() |
|
|
|
def get_all_users(self) -> List[str]: |
|
"""Get list of all users with interest counts.""" |
|
with self.driver.session() as session: |
|
try: |
|
|
|
result = session.run(""" |
|
MATCH (u:User) |
|
OPTIONAL MATCH (u)-[r:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]->(interest) |
|
WITH u, |
|
COUNT(DISTINCT CASE WHEN type(r) = 'INTERESTED_IN_KEYWORD' THEN interest END) as keyword_count, |
|
COUNT(DISTINCT CASE WHEN type(r) = 'INTERESTED_IN_TOPIC' THEN interest END) as topic_count |
|
WHERE keyword_count > 0 OR topic_count > 0 |
|
RETURN |
|
u.name as username, |
|
keyword_count, |
|
topic_count, |
|
keyword_count + topic_count as total_interests |
|
ORDER BY total_interests DESC, username |
|
""") |
|
|
|
users_with_counts = [( |
|
record["username"], |
|
record["keyword_count"], |
|
record["topic_count"] |
|
) for record in result if record["username"]] |
|
|
|
if not users_with_counts: |
|
logger.warning("No users found with interests") |
|
return [] |
|
|
|
logger.info(f"Retrieved {len(users_with_counts)} users with interests") |
|
logger.info("Top 5 users by interest count:") |
|
for username, kw_count, topic_count in users_with_counts[:5]: |
|
logger.info(f" - {username}: {kw_count} keywords, {topic_count} topics") |
|
|
|
|
|
return [ |
|
f"{username} ({kw_count} keywords, {topic_count} topics)" |
|
for username, kw_count, topic_count in users_with_counts |
|
] |
|
except Exception as e: |
|
logger.error(f"Error fetching users: {str(e)}") |
|
return [] |
|
|
|
def get_user_interests(self, username: str) -> Dict[str, set]: |
|
"""Get keywords and topics a user is interested in.""" |
|
with self.driver.session() as session: |
|
|
|
keyword_result = session.run(""" |
|
MATCH (u:User {name: $username})-[:INTERESTED_IN_KEYWORD]->(k:Keyword) |
|
RETURN DISTINCT k.keyword as keyword |
|
""", username=username) |
|
keywords = {str(record["keyword"]) for record in keyword_result if record["keyword"]} |
|
|
|
|
|
logger.debug(f"Found {len(keywords)} keywords for user {username}") |
|
|
|
|
|
topic_result = session.run(""" |
|
MATCH (u:User {name: $username})-[:INTERESTED_IN_TOPIC]->(t:Topic) |
|
RETURN DISTINCT t.topic as topic |
|
""", username=username) |
|
topics = {str(record["topic"]) for record in topic_result if record["topic"]} |
|
|
|
|
|
logger.debug(f"Found {len(topics)} topics for user {username}") |
|
|
|
return {"keywords": keywords or set(), "topics": topics or set()} |
|
|
|
def find_common_questions(self, user1: str, user2: str, max_questions: int = 5) -> List[Dict]: |
|
"""Find questions to recommend based on common interests using advanced Neo4j features.""" |
|
with self.driver.session() as session: |
|
|
|
user_check = session.run(""" |
|
MATCH (u1:User {name: $user1}) |
|
MATCH (u2:User {name: $user2}) |
|
OPTIONAL MATCH (u1)-[r1:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]->(interest1) |
|
OPTIONAL MATCH (u2)-[r2:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]->(interest2) |
|
RETURN |
|
COUNT(DISTINCT u1) as user1_exists, |
|
COUNT(DISTINCT u2) as user2_exists, |
|
COUNT(DISTINCT interest1) as user1_interests, |
|
COUNT(DISTINCT interest2) as user2_interests |
|
""", user1=user1, user2=user2).single() |
|
|
|
if not (user_check and user_check['user1_exists'] and user_check['user2_exists']): |
|
logger.error(f"One or both users not found: {user1}, {user2}") |
|
return [] |
|
|
|
logger.info(f"User {user1} has {user_check['user1_interests']} total interests") |
|
logger.info(f"User {user2} has {user_check['user2_interests']} total interests") |
|
|
|
|
|
questions_query = """ |
|
// Find all interests (both keywords and topics) for both users |
|
MATCH (u1:User {name: $user1}) |
|
MATCH (u2:User {name: $user2}) |
|
|
|
// Get all interests for both users |
|
OPTIONAL MATCH (u1)-[r1:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]->(interest1) |
|
OPTIONAL MATCH (u2)-[r2:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]->(interest2) |
|
WITH u1, u2, |
|
COLLECT(DISTINCT interest1) as u1_interests, |
|
COLLECT(DISTINCT interest2) as u2_interests |
|
|
|
// Find questions related to either user's interests for each source |
|
CALL { |
|
WITH u1, u2, u1_interests, u2_interests |
|
UNWIND u1_interests + u2_interests as interest |
|
MATCH (q:Question)-[r:HAS_KEYWORD|HAS_TOPIC]->(interest) |
|
WHERE |
|
q.author <> $user1 AND |
|
q.author <> $user2 AND |
|
q.source = 'stack_exchange' AND |
|
( |
|
(interest IN u1_interests AND interest IN u2_interests) OR |
|
(interest IN u1_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u2))) OR |
|
(interest IN u2_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u1))) |
|
) |
|
WITH q, interest, type(r) as rel_type, |
|
CASE WHEN interest IN u1_interests AND interest IN u2_interests THEN 2.0 ELSE 1.0 END as interest_weight |
|
WITH q, collect({interest: interest, weight: interest_weight, type: rel_type}) as interests, |
|
sum(interest_weight) as base_score |
|
RETURN q, interests, base_score |
|
ORDER BY base_score * rand() DESC |
|
LIMIT 15 // Increased from 10 to get more variety |
|
|
|
UNION |
|
|
|
WITH u1, u2, u1_interests, u2_interests |
|
UNWIND u1_interests + u2_interests as interest |
|
MATCH (q:Question)-[r:HAS_KEYWORD|HAS_TOPIC]->(interest) |
|
WHERE |
|
q.source = 'trivia' AND |
|
( |
|
(interest IN u1_interests AND interest IN u2_interests) OR |
|
(interest IN u1_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u2))) OR |
|
(interest IN u2_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u1))) |
|
) |
|
WITH q, interest, type(r) as rel_type, |
|
CASE WHEN interest IN u1_interests AND interest IN u2_interests THEN 2.0 ELSE 1.0 END as interest_weight |
|
WITH q, collect({interest: interest, weight: interest_weight, type: rel_type}) as interests, |
|
sum(interest_weight) as base_score |
|
RETURN q, interests, base_score |
|
ORDER BY base_score * rand() DESC |
|
LIMIT 15 // Increased from 10 to get more variety |
|
|
|
UNION |
|
|
|
WITH u1, u2, u1_interests, u2_interests |
|
UNWIND u1_interests + u2_interests as interest |
|
MATCH (q:Question)-[r:HAS_KEYWORD|HAS_TOPIC]->(interest) |
|
WHERE |
|
q.source = 'wikipedia' AND |
|
( |
|
(interest IN u1_interests AND interest IN u2_interests) OR |
|
(interest IN u1_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u2))) OR |
|
(interest IN u2_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u1))) |
|
) |
|
WITH q, interest, type(r) as rel_type, |
|
CASE WHEN interest IN u1_interests AND interest IN u2_interests THEN 2.0 ELSE 1.0 END as interest_weight |
|
WITH q, collect({interest: interest, weight: interest_weight, type: rel_type}) as interests, |
|
sum(interest_weight) as base_score |
|
RETURN q, interests, base_score |
|
ORDER BY base_score * rand() DESC |
|
LIMIT 15 // Increased from 10 to get more variety |
|
|
|
UNION |
|
|
|
WITH u1, u2, u1_interests, u2_interests |
|
UNWIND u1_interests + u2_interests as interest |
|
MATCH (q:Question)-[r:HAS_KEYWORD|HAS_TOPIC]->(interest) |
|
WHERE |
|
q.source = 'reddit' AND |
|
( |
|
(interest IN u1_interests AND interest IN u2_interests) OR |
|
(interest IN u1_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u2))) OR |
|
(interest IN u2_interests AND EXISTS((q)-[:HAS_KEYWORD|HAS_TOPIC]->()<-[:INTERESTED_IN_KEYWORD|INTERESTED_IN_TOPIC]-(u1))) |
|
) |
|
WITH q, interest, type(r) as rel_type, |
|
CASE WHEN interest IN u1_interests AND interest IN u2_interests THEN 2.0 ELSE 1.0 END as interest_weight |
|
WITH q, collect({interest: interest, weight: interest_weight, type: rel_type}) as interests, |
|
sum(interest_weight) as base_score |
|
RETURN q, interests, base_score |
|
ORDER BY base_score * rand() DESC |
|
LIMIT 15 // Increased from 10 to get more variety |
|
} |
|
|
|
// Calculate temporal relevance for the combined results |
|
WITH q, interests, base_score, |
|
CASE |
|
WHEN q.created_utc_ts IS NOT NULL |
|
THEN base_score * (1.0 + 0.1 * (1.0 - duration.between(q.created_utc_ts, datetime()).days / 365.0)) |
|
ELSE base_score |
|
END as temporal_score, |
|
// Add source-specific random boost to ensure better mixing |
|
CASE q.source |
|
WHEN 'stack_exchange' THEN rand() * 0.4 |
|
WHEN 'trivia' THEN rand() * 0.4 |
|
WHEN 'wikipedia' THEN rand() * 0.4 |
|
WHEN 'reddit' THEN rand() * 0.4 |
|
ELSE rand() * 0.4 |
|
END as source_random_boost |
|
|
|
// Return results with all metadata |
|
WITH q, interests, temporal_score, source_random_boost, |
|
temporal_score * (0.6 + 0.8 * rand()) + source_random_boost as final_score |
|
RETURN DISTINCT |
|
q.title as title, |
|
q.body as body, |
|
q.created_utc_ts as created_utc_ts, |
|
q.author as author, |
|
q.source as source, |
|
q.correct_answer as correct_answer, |
|
q.incorrect_answers as incorrect_answers, |
|
q.upvotes as upvotes, |
|
q.num_comments as num_comments, |
|
q.subreddit as subreddit, |
|
[i in interests | CASE |
|
WHEN i.type = 'HAS_KEYWORD' THEN i.interest.keyword |
|
ELSE i.interest.topic |
|
END] as matching_interests, |
|
[i in interests | CASE |
|
WHEN i.type = 'HAS_KEYWORD' THEN 'keyword' |
|
ELSE 'topic' |
|
END] as interest_types, |
|
final_score as relevance_score |
|
ORDER BY final_score DESC |
|
LIMIT $max_questions |
|
""" |
|
|
|
questions = [dict(record) for record in session.run(questions_query, |
|
user1=user1, |
|
user2=user2, |
|
max_questions=max_questions)] |
|
|
|
if questions: |
|
first_q = questions[0] |
|
logger.info(f"Sample question:") |
|
logger.info(f"Title: {first_q.get('title', 'No title')}") |
|
logger.info(f"Author: {first_q.get('author', 'No author')}") |
|
logger.info(f"Score: {first_q.get('relevance_score', 0)}") |
|
logger.info(f"Interests: {first_q.get('matching_interests', [])}") |
|
|
|
logger.info(f"Found {len(questions)} questions with common interests") |
|
return questions |
|
|
|
def process_body(text, title): |
|
"""Process question body to handle images and HTML.""" |
|
if not text: |
|
logger.warning(f"Empty body for question: {title}") |
|
return "" |
|
|
|
try: |
|
from bs4 import BeautifulSoup |
|
|
|
|
|
soup = BeautifulSoup(str(text), 'html.parser') |
|
|
|
|
|
def fix_stack_exchange_url(url): |
|
if not url: |
|
return url |
|
if url.startswith(('http://', 'https://')): |
|
return url |
|
if url.startswith('//'): |
|
return 'https:' + url |
|
if url.startswith('/'): |
|
return 'https://i.stack.imgur.com' + url |
|
return 'https://i.stack.imgur.com/' + url |
|
|
|
|
|
for img in soup.find_all('img'): |
|
src = img.get('src', '') |
|
if not src: |
|
continue |
|
|
|
fixed_src = fix_stack_exchange_url(src) |
|
alt_text = img.get('alt', '').strip() |
|
if not alt_text or alt_text.lower() == 'enter image description here': |
|
alt_text = 'Question image' |
|
|
|
|
|
preview_html = f""" |
|
<div class="image-preview" style="margin: 10px 0; padding: 10px; background: #f9fafc; border-radius: 6px;"> |
|
<div style="display: flex; align-items: center; margin-bottom: 8px;"> |
|
<span style="font-size: 20px; margin-right: 8px;">πΌοΈ</span> |
|
<span style="color: #219ebc;">{alt_text}</span> |
|
</div> |
|
<a href="{fixed_src}" target="_blank" rel="noopener noreferrer" |
|
style="color: #219ebc; text-decoration: none;">View image</a> |
|
</div> |
|
""" |
|
|
|
new_soup = BeautifulSoup(preview_html, 'html.parser') |
|
img.replace_with(new_soup) |
|
|
|
|
|
for link in soup.find_all('a'): |
|
if 'View Image' not in (link.get_text() or ''): |
|
href = link.get('href', '') |
|
if href and not href.startswith(('http://', 'https://')): |
|
link['href'] = fix_stack_exchange_url(href) |
|
link['target'] = '_blank' |
|
link['rel'] = 'noopener noreferrer' |
|
link['style'] = 'color: #219ebc; text-decoration: none;' |
|
|
|
|
|
for p in soup.find_all(['p', 'div']): |
|
if not any(cls in (p.get('class', []) or []) for cls in ['image-preview', 'question-card']): |
|
current_style = p.get('style', '') |
|
p['style'] = f"{current_style}; margin: 0.8em 0; line-height: 1.6; color: #333333;" |
|
|
|
|
|
for ul in soup.find_all(['ul', 'ol']): |
|
ul['style'] = 'margin: 0.8em 0; padding-left: 1.5em; color: #333333;' |
|
|
|
for li in soup.find_all('li'): |
|
li['style'] = 'margin: 0.4em 0; line-height: 1.6; color: #333333;' |
|
|
|
|
|
for code in soup.find_all(['code', 'pre']): |
|
code['style'] = 'background: #f9fafc; padding: 0.2em 0.4em; border-radius: 4px; font-family: monospace; color: #333333;' |
|
|
|
return str(soup) |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing question body: {str(e)}") |
|
return str(text) if text else "" |
|
|
|
def format_question(q: Dict) -> str: |
|
"""Format a question for display based on its source.""" |
|
try: |
|
|
|
title = q.get('title', 'Untitled') |
|
source = q.get('source', '').lower() |
|
|
|
|
|
metadata_html = "" |
|
content_html = "" |
|
|
|
|
|
if 'author' in q or 'created_utc_ts' in q: |
|
author = q.get('author', 'Unknown author') |
|
created_date = format_neo4j_datetime(q.get('created_utc_ts')) |
|
upvotes = q.get('upvotes', 0) |
|
num_comments = q.get('num_comments', 0) |
|
|
|
metadata_html = f""" |
|
<div class="question-meta" style="font-size: 0.9em; margin-bottom: 15px;"> |
|
<span style="color: #219ebc; font-weight: 500;">{author}</span> |
|
{' asked' if source == 'stack_exchange' else ' posted'} on |
|
<span style="color: #023047;">{created_date}</span> |
|
<div class="stats" style="margin-top: 5px;"> |
|
<span title="Upvotes"><span style="color: #219ebc;">β²</span> {upvotes}</span> |
|
<span style="margin-left: 15px;" title="Comments"><span style="color: #219ebc;">π¬</span> {num_comments}</span> |
|
</div> |
|
</div> |
|
""" |
|
|
|
|
|
if source == "stack_exchange": |
|
body = q.get('body', '') |
|
if body: |
|
content_html = f""" |
|
<div class="question-content" style="margin-top: 20px; font-family: 'Segoe UI', system-ui, -apple-system, sans-serif; color: #023047; line-height: 1.6;"> |
|
{process_body(body, title)} |
|
</div> |
|
""" |
|
|
|
elif source == "trivia": |
|
correct_answer = q.get('correct_answer', '') |
|
incorrect_answers = q.get('incorrect_answers', []) |
|
|
|
answers = [correct_answer] + incorrect_answers if incorrect_answers else [correct_answer] |
|
answers_html = "".join([ |
|
f""" |
|
<div class="answer-option" style="margin: 8px 0; padding: 10px; background: #f9fafc; border-radius: 6px; border: 1px solid #8ecae6; border-left: 3px solid {'#4caf50' if answer == correct_answer else '#8ecae6'};"> |
|
<span style="color: {'#4caf50' if answer == correct_answer else '#023047'}; font-weight: {'500' if answer == correct_answer else 'normal'};"> |
|
{answer} |
|
</span> |
|
</div> |
|
""" |
|
for answer in answers |
|
]) |
|
|
|
content_html = f""" |
|
<div class="answers-container" style="margin-top: 15px;"> |
|
<div style="color: #023047; margin-bottom: 10px; font-weight: 500;">Answer options:</div> |
|
{answers_html} |
|
</div> |
|
""" |
|
|
|
elif source == "wikipedia": |
|
correct_answer = q.get('correct_answer', '') |
|
if correct_answer: |
|
content_html = f""" |
|
<div class="answer" style="margin-top: 15px; padding: 15px; background: #f9fafc; border-radius: 6px; border: 1px solid #8ecae6;"> |
|
<div style="color: #023047; margin-bottom: 10px; font-weight: 500;">Answer:</div> |
|
<div style="color: #4caf50; font-weight: 500;">{correct_answer}</div> |
|
</div> |
|
""" |
|
|
|
elif source == "reddit": |
|
if 'subreddit' in q: |
|
subreddit = q.get('subreddit', '') |
|
metadata_html = metadata_html.replace( |
|
'posted on', |
|
f'posted in <span style="color: #219ebc; font-weight: 500;">r/{subreddit}</span> on' |
|
) |
|
|
|
if not content_html: |
|
if 'body' in q: |
|
content_html = f""" |
|
<div class="question-content" style="margin-top: 20px; font-family: 'Segoe UI', system-ui, -apple-system, sans-serif; color: #023047; line-height: 1.6;"> |
|
{process_body(q['body'], title)} |
|
</div> |
|
""" |
|
elif 'correct_answer' in q: |
|
content_html = f""" |
|
<div class="answer" style="margin-top: 15px; padding: 15px; background: #f9fafc; border-radius: 6px; border: 1px solid #8ecae6;"> |
|
<div style="color: #023047; margin-bottom: 10px; font-weight: 500;">Answer:</div> |
|
<div style="color: #4caf50; font-weight: 500;">{q['correct_answer']}</div> |
|
</div> |
|
""" |
|
|
|
|
|
source_icon = { |
|
'stack_exchange': 'β‘', |
|
'reddit': 'πΈ', |
|
'wikipedia': 'π', |
|
'trivia': 'π―', |
|
}.get(source, 'β') |
|
|
|
source_color = { |
|
'stack_exchange': '#219ebc', |
|
'reddit': '#fb8500', |
|
'wikipedia': '#4caf50', |
|
'trivia': '#ffb703', |
|
}.get(source, '#219ebc') |
|
|
|
source_display = source.title() if source else "Unknown" |
|
source_badge = f""" |
|
<div class="source-badge" style="display: inline-flex; align-items: center; padding: 4px 8px; background: #f9fafc; border-radius: 4px; margin-right: 10px; border: 1px solid {source_color};"> |
|
<span style="margin-right: 6px; font-size: 1.1em;">{source_icon}</span> |
|
<span style="color: {source_color}; font-size: 0.9em; font-weight: 500;">{source_display}</span> |
|
</div> |
|
""" |
|
|
|
|
|
matching_interests = q.get('matching_interests', []) |
|
interest_types = q.get('interest_types', []) |
|
interests_with_types = [] |
|
for interest, type_ in zip(matching_interests, interest_types): |
|
if interest and type_: |
|
interests_with_types.append({ |
|
'name': interest, |
|
'type': type_ |
|
}) |
|
|
|
keywords = [i['name'] for i in interests_with_types if i['type'] == 'keyword'] |
|
topics = [i['name'] for i in interests_with_types if i['type'] == 'topic'] |
|
|
|
interests_display = [] |
|
if keywords: |
|
interests_display.append(f"Keywords: {format_interest_list_for_display(set(keywords), max_items=3)}") |
|
if topics: |
|
interests_display.append(f"Topics: {format_interest_list_for_display(set(topics), max_items=3)}") |
|
interests_str = " | ".join(interests_display) if interests_display else "No common interests found" |
|
|
|
relevance_score = q.get('relevance_score', 0) |
|
score_display = f""" |
|
<div class="relevance-score" style="display: inline-block; padding: 4px 8px; background: #f9fafc; border-radius: 4px; margin-left: 10px; border: 1px solid #8ecae6;"> |
|
<span style="color: #219ebc; font-size: 0.9em;">Relevance: {relevance_score:.2f}</span> |
|
</div> |
|
""" if relevance_score > 0 else "" |
|
|
|
|
|
question_html = f""" |
|
<div class="question-card" style="background: #ffffff; padding: 20px; border-radius: 8px; margin: 15px 0; border: 1px solid #219ebc; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);"> |
|
<div class="question-header" style="display: flex; justify-content: space-between; align-items: flex-start; margin-bottom: 15px; border-bottom: 1px solid #8ecae6; padding-bottom: 10px;"> |
|
<div style="flex: 1; display: flex; align-items: center;"> |
|
{source_badge} |
|
<h3 style="color: #023047; margin: 0; font-size: 1.4em; display: inline;">{title}</h3> |
|
</div> |
|
{score_display} |
|
</div> |
|
|
|
{metadata_html} |
|
|
|
<div class="interests-bar" style="margin: 15px 0; padding: 10px; background: #f9fafc; border-radius: 6px; border: 1px solid #8ecae6; border-left: 3px solid #219ebc;"> |
|
<div style="color: #023047; font-size: 0.9em; font-weight: 500;">Common Interests:</div> |
|
<div style="color: #219ebc; font-weight: 500; margin-top: 5px;">{interests_str}</div> |
|
</div> |
|
|
|
{content_html} |
|
</div> |
|
""" |
|
|
|
return question_html |
|
|
|
except Exception as e: |
|
logger.error(f"Error formatting question: {str(e)}") |
|
return f""" |
|
<div style="background: #fee2e2; padding: 15px; border-radius: 8px; margin: 10px 0; border: 1px solid #dc2626;"> |
|
<div style="color: #dc2626;">Error displaying question: {str(e)}</div> |
|
</div> |
|
""" |
|
|
|
def loading_message() -> Tuple[str, str, str]: |
|
"""Return loading message in proper HTML format.""" |
|
loading_html = """ |
|
<div class="loading-spinner"> |
|
<div style="text-align: center;"> |
|
<div style="border: 4px solid #60a5fa; border-top: 4px solid transparent; border-radius: 50%; width: 40px; height: 40px; animation: spin 1s linear infinite; margin: 20px auto;"></div> |
|
<div style="color: #60a5fa; margin-top: 10px;">Analyzing interests and finding recommendations...</div> |
|
</div> |
|
</div> |
|
""" |
|
return loading_html, loading_html, loading_html |
|
|
|
def recommend_questions(user1: str, user2: str) -> Tuple[str, str, str, List[Dict]]: |
|
"""Main function to get recommendations and user interests.""" |
|
|
|
user1 = user1.split(" (")[0] if " (" in user1 else user1 |
|
user2 = user2.split(" (")[0] if " (" in user2 else user2 |
|
|
|
recommender = QuestionRecommender() |
|
try: |
|
|
|
user1_interests = recommender.get_user_interests(user1) |
|
user2_interests = recommender.get_user_interests(user2) |
|
|
|
|
|
common_keywords = user1_interests['keywords'] & user2_interests['keywords'] |
|
common_topics = user1_interests['topics'] & user2_interests['topics'] |
|
|
|
|
|
interests_summary = f""" |
|
<div class="interests-summary"> |
|
<div class="user-interests"> |
|
<h3>{user1}'s Interests</h3> |
|
<div class="interest-section"> |
|
<strong>Keywords:</strong> {format_interest_list_for_display(user1_interests['keywords'], max_items=8)} |
|
</div> |
|
<div class="interest-section"> |
|
<strong>Topics:</strong> {format_interest_list_for_display(user1_interests['topics'], max_items=5)} |
|
</div> |
|
</div> |
|
|
|
<div class="user-interests"> |
|
<h3>{user2}'s Interests</h3> |
|
<div class="interest-section"> |
|
<strong>Keywords:</strong> {format_interest_list_for_display(user2_interests['keywords'], max_items=8)} |
|
</div> |
|
<div class="interest-section"> |
|
<strong>Topics:</strong> {format_interest_list_for_display(user2_interests['topics'], max_items=5)} |
|
</div> |
|
</div> |
|
|
|
<div class="common-interests"> |
|
<h3>Common Interests</h3> |
|
<div class="interest-section"> |
|
<strong>Keywords:</strong> {format_interest_list_for_display(common_keywords, max_items=8)} |
|
</div> |
|
<div class="interest-section"> |
|
<strong>Topics:</strong> {format_interest_list_for_display(common_topics, max_items=5)} |
|
</div> |
|
</div> |
|
</div> |
|
""" |
|
|
|
|
|
questions = recommender.find_common_questions(user1, user2, max_questions=50) |
|
|
|
if questions: |
|
questions_text = '<div class="questions-container">\n' + \ |
|
'\n'.join(format_question(q) for q in questions) + \ |
|
'\n</div>' |
|
|
|
recommendation_type = '<h2 class="recommendation-header">Recommendations Based on Common Interests</h2>' |
|
else: |
|
questions_text = '<div class="no-questions">No questions found based on common interests.</div>' |
|
recommendation_type = '<h2 class="recommendation-header">No Recommendations Available</h2>' |
|
|
|
return interests_summary, recommendation_type, questions_text, questions |
|
|
|
except Exception as e: |
|
logger.error(f"Error in recommend_questions: {str(e)}") |
|
return ( |
|
'<div class="error">Error fetching user interests. Please try again.</div>', |
|
'<h2 class="error-header">Error</h2>', |
|
f'<div class="error-message">An error occurred: {str(e)}</div>', |
|
[] |
|
) |
|
finally: |
|
recommender.close() |
|
|
|
|
|
custom_css = """ |
|
.gradio-container { |
|
max-width: 1200px !important; |
|
margin: auto !important; |
|
padding: 20px !important; |
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif !important; |
|
} |
|
|
|
/* Dropdown styling */ |
|
.gradio-dropdown > ul { |
|
max-height: 300px !important; |
|
overflow-y: auto !important; |
|
scrollbar-width: thin !important; |
|
} |
|
|
|
.gradio-dropdown > ul::-webkit-scrollbar { |
|
width: 6px !important; |
|
} |
|
|
|
.gradio-dropdown > ul::-webkit-scrollbar-track { |
|
background: #f9fafc !important; |
|
border-radius: 3px !important; |
|
} |
|
|
|
.gradio-dropdown > ul::-webkit-scrollbar-thumb { |
|
background: #219ebc !important; |
|
border-radius: 3px !important; |
|
} |
|
|
|
.gradio-dropdown > ul::-webkit-scrollbar-thumb:hover { |
|
background: #023047 !important; |
|
} |
|
|
|
.interests-summary { |
|
background: #ffffff; |
|
padding: 20px; |
|
border-radius: 10px; |
|
margin-bottom: 20px; |
|
border: 1px solid #219ebc; |
|
} |
|
|
|
.user-interests, .common-interests { |
|
background: #f9fafc; |
|
padding: 15px; |
|
border-radius: 8px; |
|
margin: 10px 0; |
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
border: 1px solid #8ecae6; |
|
} |
|
|
|
.interest-section { |
|
margin: 10px 0; |
|
line-height: 1.5; |
|
color: #333333; |
|
} |
|
|
|
/* Progress bar and loading animation */ |
|
.progress-bar > div { |
|
background: #ffb703 !important; |
|
} |
|
|
|
.progress-bar { |
|
background: #f9fafc !important; |
|
} |
|
|
|
.loading { |
|
color: #219ebc !important; |
|
} |
|
|
|
/* Question section styling */ |
|
.questions-container { |
|
margin-top: 20px; |
|
} |
|
|
|
.question-card { |
|
background: #ffffff; |
|
padding: 20px; |
|
border-radius: 8px; |
|
margin: 15px 0; |
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
border: 1px solid #219ebc; |
|
transition: transform 0.2s ease; |
|
} |
|
|
|
.question-card:hover { |
|
transform: translateY(-2px); |
|
} |
|
|
|
.question-header { |
|
border-bottom: 1px solid #8ecae6; |
|
padding-bottom: 10px; |
|
margin-bottom: 15px; |
|
} |
|
|
|
.question-header h3 { |
|
color: #023047 !important; |
|
margin: 0; |
|
font-size: 1.4em; |
|
} |
|
|
|
.source-badge { |
|
background: #f9fafc !important; |
|
border: 1px solid #219ebc !important; |
|
color: #219ebc !important; |
|
} |
|
|
|
.interests-bar { |
|
background: #f9fafc !important; |
|
border: 1px solid #8ecae6 !important; |
|
border-left: 3px solid #219ebc !important; |
|
padding: 10px; |
|
margin: 15px 0; |
|
} |
|
|
|
.interests-bar div { |
|
color: #333333 !important; |
|
} |
|
|
|
.question-meta { |
|
color: #333333 !important; |
|
margin: 10px 0; |
|
} |
|
|
|
.question-meta span { |
|
color: #219ebc !important; |
|
} |
|
|
|
.question-content { |
|
color: #333333 !important; |
|
line-height: 1.6; |
|
} |
|
|
|
.question-content a { |
|
color: #219ebc !important; |
|
text-decoration: none; |
|
} |
|
|
|
.question-content code, .question-content pre { |
|
background: #f9fafc !important; |
|
border: 1px solid #8ecae6 !important; |
|
color: #333333 !important; |
|
padding: 0.2em 0.4em; |
|
border-radius: 4px; |
|
} |
|
|
|
.answer-option { |
|
background: #f9fafc !important; |
|
border: 1px solid #8ecae6 !important; |
|
margin: 8px 0; |
|
padding: 10px; |
|
border-radius: 6px; |
|
} |
|
|
|
.answer { |
|
background: #f9fafc !important; |
|
border: 1px solid #8ecae6 !important; |
|
padding: 15px; |
|
border-radius: 6px; |
|
margin-top: 15px; |
|
} |
|
|
|
.image-preview { |
|
background: #f9fafc !important; |
|
border: 1px solid #8ecae6 !important; |
|
padding: 10px; |
|
border-radius: 6px; |
|
margin: 10px 0; |
|
} |
|
|
|
.relevance-score { |
|
background: #f9fafc !important; |
|
border: 1px solid #8ecae6 !important; |
|
padding: 4px 8px; |
|
border-radius: 4px; |
|
} |
|
|
|
.relevance-score span { |
|
color: #219ebc !important; |
|
} |
|
|
|
.recommendation-header { |
|
color: #023047 !important; |
|
padding: 10px 0; |
|
margin: 20px 0; |
|
border-bottom: 2px solid #219ebc; |
|
} |
|
|
|
.error { |
|
color: #dc2626; |
|
padding: 15px; |
|
background: #fee2e2; |
|
border-radius: 8px; |
|
margin: 10px 0; |
|
border: 1px solid #dc2626; |
|
} |
|
|
|
.error-header { |
|
color: #dc2626; |
|
} |
|
|
|
.error-message { |
|
background: #fee2e2; |
|
padding: 15px; |
|
border-radius: 8px; |
|
color: #dc2626; |
|
border: 1px solid #dc2626; |
|
} |
|
|
|
.no-questions { |
|
padding: 20px; |
|
background: #f9fafc; |
|
border-radius: 8px; |
|
text-align: center; |
|
color: #333333; |
|
border: 1px solid #219ebc; |
|
} |
|
|
|
h1, h2, h3 { |
|
color: #023047 !important; |
|
} |
|
|
|
strong { |
|
color: #219ebc; |
|
} |
|
|
|
.user-interests h3, .common-interests h3 { |
|
color: #023047; |
|
margin-top: 0; |
|
margin-bottom: 15px; |
|
font-size: 1.2rem; |
|
} |
|
|
|
@keyframes spin { |
|
0% { transform: rotate(0deg); } |
|
100% { transform: rotate(360deg); } |
|
} |
|
|
|
/* Additional Gradio-specific overrides */ |
|
.gr-button-primary { |
|
background: #ffb703 !important; |
|
color: #ffffff !important; |
|
} |
|
|
|
.gr-button-secondary { |
|
background: #219ebc !important; |
|
color: #ffffff !important; |
|
} |
|
|
|
.gr-form { |
|
background: #ffffff !important; |
|
border: 1px solid #219ebc !important; |
|
} |
|
|
|
.gr-input { |
|
border-color: #219ebc !important; |
|
} |
|
|
|
.gr-input:focus { |
|
border-color: #023047 !important; |
|
} |
|
|
|
.gr-box { |
|
background: #ffffff !important; |
|
border: 1px solid #219ebc !important; |
|
} |
|
|
|
.gr-panel { |
|
background: #ffffff !important; |
|
border: 1px solid #219ebc !important; |
|
} |
|
""" |
|
|
|
force_light_mode_js = """ |
|
function() { |
|
// Remove dark mode class from html element |
|
document.documentElement.classList.remove('dark'); |
|
|
|
// Set data-theme attribute to light |
|
document.documentElement.setAttribute('data-theme', 'light'); |
|
|
|
// Override any system preference |
|
document.documentElement.style.colorScheme = 'light'; |
|
|
|
// Also try to set it on the body |
|
document.body.classList.remove('dark'); |
|
document.body.setAttribute('data-theme', 'light'); |
|
} |
|
""" |
|
|
|
def main(): |
|
|
|
recommender = QuestionRecommender() |
|
users = recommender.get_all_users() |
|
recommender.close() |
|
|
|
theme = gr.themes.Base().set( |
|
body_background_fill="#f9fafc", |
|
block_background_fill="#ffffff", |
|
block_border_width="1px", |
|
block_border_color="#219ebc", |
|
block_title_text_color="#023047", |
|
block_label_text_color="#333333", |
|
input_background_fill="#ffffff", |
|
input_border_color="#219ebc", |
|
input_border_width="1px", |
|
button_primary_background_fill="#ffb703", |
|
button_secondary_background_fill="#219ebc", |
|
background_fill_primary="#ffffff", |
|
background_fill_secondary="#f9fafc" |
|
) |
|
|
|
with gr.Blocks(title="Let's Talk - Question Recommender", theme=theme, css=custom_css, mode="light", js=force_light_mode_js) as iface: |
|
gr.Markdown(""" |
|
# π€ Let's Talk - Question Recommender |
|
Find questions that two users might be interested in discussing together based on their common interests. |
|
""") |
|
|
|
with gr.Row(equal_height=True): |
|
with gr.Column(scale=1): |
|
user1_dropdown = gr.Dropdown( |
|
choices=users, |
|
label="π€ First User", |
|
interactive=True, |
|
elem_id="user1-input" |
|
) |
|
with gr.Column(scale=1): |
|
user2_dropdown = gr.Dropdown( |
|
choices=users, |
|
label="π€ Second User", |
|
interactive=True, |
|
elem_id="user2-input" |
|
) |
|
|
|
recommend_btn = gr.Button( |
|
"π Get Recommendations", |
|
variant="primary", |
|
size="lg" |
|
) |
|
|
|
with gr.Row(): |
|
interests_output = gr.HTML(label="Common Interests") |
|
|
|
recommendation_type = gr.HTML() |
|
questions_output = gr.HTML() |
|
|
|
def recommend_and_store(user1, user2): |
|
"""Get recommendations and store questions.""" |
|
interests, rec_type, questions_html, questions_data = recommend_questions(user1, user2) |
|
return interests, rec_type, questions_html |
|
|
|
|
|
recommend_btn.click( |
|
fn=loading_message, |
|
outputs=[interests_output, recommendation_type, questions_output], |
|
queue=False |
|
).then( |
|
fn=recommend_and_store, |
|
inputs=[user1_dropdown, user2_dropdown], |
|
outputs=[interests_output, recommendation_type, questions_output] |
|
) |
|
|
|
iface.launch(allowed_paths=["*"], |
|
show_error=True) |
|
|
|
if __name__ == "__main__": |
|
main() |