DVampire
update website
78f6650
import os
import json
import aiosqlite
from datetime import date, datetime, timedelta
from typing import Any, Dict, List, Optional
from contextlib import asynccontextmanager
class PapersDatabase():
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.db_path = None
async def init_db(self, config):
"""Initialize the database with required tables"""
self.db_path = config.db_path
async with self.get_connection() as conn:
cursor = await conn.cursor()
# Create papers cache table
await cursor.execute('''
CREATE TABLE IF NOT EXISTS papers_cache (
date_str TEXT PRIMARY KEY,
html_content TEXT NOT NULL,
parsed_cards TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create papers table for individual arXiv papers
await cursor.execute('''
CREATE TABLE IF NOT EXISTS papers (
arxiv_id TEXT PRIMARY KEY,
title TEXT NOT NULL,
authors TEXT NOT NULL,
abstract TEXT,
categories TEXT,
published_date TEXT,
evaluation_content TEXT,
evaluation_score REAL,
overall_score REAL,
evaluation_tags TEXT,
evaluation_status TEXT DEFAULT 'not_started',
is_evaluated BOOLEAN DEFAULT FALSE,
evaluation_date TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create latest_date table to track the most recent available date
await cursor.execute('''
CREATE TABLE IF NOT EXISTS latest_date (
id INTEGER PRIMARY KEY CHECK (id = 1),
date_str TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Insert default latest_date record if it doesn't exist
await cursor.execute('''
INSERT OR IGNORE INTO latest_date (id, date_str)
VALUES (1, ?)
''', (date.today().isoformat(),))
await conn.commit()
@asynccontextmanager
async def get_connection(self):
"""Context manager for database connections"""
conn = await aiosqlite.connect(self.db_path)
conn.row_factory = aiosqlite.Row # Enable dict-like access
# Enable WAL mode for better concurrency
await conn.execute("PRAGMA journal_mode=WAL")
await conn.execute("PRAGMA synchronous=NORMAL")
await conn.execute("PRAGMA cache_size=10000")
await conn.execute("PRAGMA temp_store=MEMORY")
try:
yield conn
finally:
await conn.close()
async def get_cached_papers(self, date_str: str) -> Optional[Dict[str, Any]]:
"""Get cached papers for a specific date"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
SELECT parsed_cards, created_at
FROM papers_cache
WHERE date_str = ?
''', (date_str,))
row = await cursor.fetchone()
if row:
return {
'cards': json.loads(row['parsed_cards']),
'cached_at': row['created_at']
}
return None
async def cache_papers(self, date_str: str, html_content: str, parsed_cards: List[Dict[str, Any]]):
"""Cache papers for a specific date"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
INSERT OR REPLACE INTO papers_cache
(date_str, html_content, parsed_cards, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
''', (date_str, html_content, json.dumps(parsed_cards)))
await conn.commit()
async def get_latest_cached_date(self) -> Optional[str]:
"""Get the latest cached date"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('SELECT date_str FROM latest_date WHERE id = 1')
row = await cursor.fetchone()
return row['date_str'] if row else None
async def update_latest_date(self, date_str: str):
"""Update the latest available date"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
UPDATE latest_date
SET date_str = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = 1
''', (date_str,))
await conn.commit()
async def is_cache_fresh(self, date_str: str, max_age_hours: int = 24) -> bool:
"""Check if cache is fresh (within max_age_hours)"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
SELECT updated_at
FROM papers_cache
WHERE date_str = ?
''', (date_str,))
row = await cursor.fetchone()
if not row:
return False
cached_time = datetime.fromisoformat(row['updated_at'].replace('Z', '+00:00'))
age = datetime.now(cached_time.tzinfo) - cached_time
return age.total_seconds() < max_age_hours * 3600
async def cleanup_old_cache(self, days_to_keep: int = 7):
"""Clean up old cache entries"""
cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat()
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
DELETE FROM papers_cache
WHERE updated_at < ?
''', (cutoff_date,))
await conn.commit()
# Papers table methods
async def insert_paper(self, arxiv_id: str, title: str, authors: str, abstract: str = None,
categories: str = None, published_date: str = None):
"""Insert a new paper into the papers table"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
INSERT OR REPLACE INTO papers
(arxiv_id, title, authors, abstract, categories, published_date, updated_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (arxiv_id, title, authors, abstract, categories, published_date))
await conn.commit()
async def get_paper(self, arxiv_id: str) -> Optional[Dict[str, Any]]:
"""Get a paper by arxiv_id"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
SELECT * FROM papers WHERE arxiv_id = ?
''', (arxiv_id,))
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_papers_by_evaluation_status(self, is_evaluated: bool = None) -> List[Dict[str, Any]]:
"""Get papers by evaluation status"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
if is_evaluated is None:
await cursor.execute('SELECT * FROM papers ORDER BY created_at DESC')
else:
await cursor.execute('''
SELECT * FROM papers
WHERE is_evaluated = ?
ORDER BY created_at DESC
''', (is_evaluated,))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def update_paper_evaluation(self, arxiv_id: str, evaluation_content: str,
evaluation_score: float = None, overall_score: float = None, evaluation_tags: str = None):
"""Update paper with evaluation content"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
UPDATE papers
SET evaluation_content = ?,
evaluation_score = ?,
overall_score = ?,
evaluation_tags = ?,
is_evaluated = TRUE,
evaluation_status = 'completed',
evaluation_date = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE arxiv_id = ?
''', (evaluation_content, evaluation_score, overall_score, evaluation_tags, arxiv_id))
await conn.commit()
async def update_paper_status(self, arxiv_id: str, status: str):
"""Update paper evaluation status"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('''
UPDATE papers
SET evaluation_status = ?,
updated_at = CURRENT_TIMESTAMP
WHERE arxiv_id = ?
''', (status, arxiv_id))
await conn.commit()
async def get_unevaluated_papers(self) -> List[Dict[str, Any]]:
"""Get all papers that haven't been evaluated yet"""
return await self.get_papers_by_evaluation_status(is_evaluated=False)
async def get_evaluated_papers(self) -> List[Dict[str, Any]]:
"""Get all papers that have been evaluated"""
return await self.get_papers_by_evaluation_status(is_evaluated=True)
async def search_papers(self, query: str) -> List[Dict[str, Any]]:
"""Search papers by title, authors, or abstract"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
search_pattern = f'%{query}%'
await cursor.execute('''
SELECT * FROM papers
WHERE title LIKE ? OR authors LIKE ? OR abstract LIKE ?
ORDER BY created_at DESC
''', (search_pattern, search_pattern, search_pattern))
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def delete_paper(self, arxiv_id: str):
"""Delete a paper from the database"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('DELETE FROM papers WHERE arxiv_id = ?', (arxiv_id,))
await conn.commit()
async def get_papers_count(self) -> Dict[str, int]:
"""Get count of papers by evaluation status"""
async with self.get_connection() as conn:
cursor = await conn.cursor()
await cursor.execute('SELECT COUNT(*) as total FROM papers')
total_row = await cursor.fetchone()
total = total_row['total']
await cursor.execute('SELECT COUNT(*) as evaluated FROM papers WHERE is_evaluated = TRUE')
evaluated_row = await cursor.fetchone()
evaluated = evaluated_row['evaluated']
return {
'total': total,
'evaluated': evaluated,
'unevaluated': total - evaluated
}
def __str__(self):
return f"PapersDatabase(db_path={self.db_path})"
def __repr__(self):
return self.__str__()
db = PapersDatabase()