Spaces:
Runtime error
Runtime error
import bz2 | |
import re | |
import os | |
import sqlite3 | |
from pathlib import Path | |
from xml.sax import make_parser, handler | |
import time | |
class WikiContentHandler(handler.ContentHandler): | |
def __init__(self, db_conn, batch_size=1000, max_articles=None): | |
self.db_conn = db_conn | |
self.cursor = db_conn.cursor() | |
self.batch_size = batch_size | |
self.article_count = 0 | |
self.max_articles = max_articles | |
self.article_batch = [] | |
self.links_batch = [] | |
# Current elements | |
self.current_title = None | |
self.current_text = None | |
self.current_ns = None | |
self.in_page = False | |
self.in_title = False | |
self.in_text = False | |
self.in_ns = False | |
self.buffer = [] | |
def startElement(self, name, attrs): | |
if name == 'page': | |
self.in_page = True | |
self.current_title = None | |
self.current_text = None | |
self.current_ns = None | |
elif self.in_page and name == 'title': | |
self.in_title = True | |
self.buffer = [] | |
elif self.in_page and name == 'ns': | |
self.in_ns = True | |
self.buffer = [] | |
elif self.in_page and name == 'text': | |
self.in_text = True | |
self.buffer = [] | |
def endElement(self, name): | |
if name == 'page': | |
self.in_page = False | |
# Only process main namespace articles (ns = 0) | |
if self.current_ns == '0' and self.current_title and self.current_text: | |
# Extract links | |
links = self.extract_links(self.current_text) | |
# Add to batch | |
self.article_batch.append( | |
(self.current_title, self.current_text) | |
) | |
# Add links to batch | |
for link in links: | |
self.links_batch.append( | |
(self.current_title, link) | |
) | |
self.article_count += 1 | |
# Print progress | |
if self.article_count % 100 == 0: | |
print(f"Processed {self.article_count} articles...") | |
# Insert batch if reached batch size | |
if len(self.article_batch) >= self.batch_size: | |
self._insert_batch() | |
# Check if we've reached the maximum number of articles | |
if self.max_articles and self.article_count >= self.max_articles: | |
self._insert_batch() # Insert any remaining items | |
raise StopIteration("Reached maximum number of articles") | |
elif name == 'title': | |
self.in_title = False | |
self.current_title = ''.join(self.buffer) | |
elif name == 'ns': | |
self.in_ns = False | |
self.current_ns = ''.join(self.buffer) | |
elif name == 'text': | |
self.in_text = False | |
self.current_text = ''.join(self.buffer) | |
def characters(self, content): | |
if self.in_title: | |
self.buffer.append(content) | |
elif self.in_ns: | |
self.buffer.append(content) | |
elif self.in_text: | |
self.buffer.append(content) | |
def extract_links(self, text): | |
"""Extract links from article wikitext""" | |
# Pattern to match [[Link]] or [[Link|Text]] format | |
links = re.findall(r'\[\[([^|\]]+)(?:\|[^\]]+)?\]\]', text) | |
# Process links | |
processed_links = [] | |
for link in links: | |
# Skip non-article links (except categories which might be useful) | |
if ':' in link and not link.startswith('Category:'): | |
continue | |
# Remove any section links (with #) | |
link = link.split('#')[0].strip() | |
# Skip empty links | |
if not link: | |
continue | |
processed_links.append(link) | |
# Remove duplicates and return | |
return list(set(processed_links)) | |
def _insert_batch(self): | |
"""Insert batched data into the database""" | |
if self.article_batch: | |
self.cursor.executemany( | |
"INSERT OR IGNORE INTO articles (title, text) VALUES (?, ?)", | |
self.article_batch | |
) | |
if self.links_batch: | |
self.cursor.executemany( | |
"INSERT OR IGNORE INTO links (source_title, target_title) VALUES (?, ?)", | |
self.links_batch | |
) | |
self.db_conn.commit() | |
self.article_batch = [] | |
self.links_batch = [] | |
def create_db_schema(db_conn): | |
"""Create the database schema""" | |
cursor = db_conn.cursor() | |
# Create articles table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS articles ( | |
title TEXT PRIMARY KEY, | |
text TEXT | |
) | |
''') | |
# Create links table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS links ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
source_title TEXT, | |
target_title TEXT, | |
FOREIGN KEY (source_title) REFERENCES articles (title), | |
UNIQUE (source_title, target_title) | |
) | |
''') | |
# Create index on links for faster queries | |
cursor.execute(''' | |
CREATE INDEX IF NOT EXISTS idx_links_source ON links (source_title) | |
''') | |
cursor.execute(''' | |
CREATE INDEX IF NOT EXISTS idx_links_target ON links (target_title) | |
''') | |
db_conn.commit() | |
def parse_wiki_dump(dump_path, db_path, batch_size=1000, max_articles=None): | |
""" | |
Parse the Wikipedia XML dump and extract articles with their links into SQLite database. | |
Args: | |
dump_path: Path to the bz2 Wikipedia dump | |
db_path: Path to save the SQLite database | |
batch_size: Number of articles to process before committing to the database | |
max_articles: Maximum number of articles to extract (None for all) | |
Returns: | |
The path to the created SQLite database | |
""" | |
start_time = time.time() | |
print(f"Parsing Wikipedia dump: {dump_path}") | |
# Create or connect to SQLite database | |
db_conn = sqlite3.connect(db_path) | |
# Create schema | |
create_db_schema(db_conn) | |
# Create SAX parser with custom content handler | |
parser = make_parser() | |
content_handler = WikiContentHandler(db_conn, batch_size, max_articles) | |
parser.setContentHandler(content_handler) | |
# Parse the dump | |
try: | |
parser.parse(bz2.BZ2File(dump_path)) | |
# Insert any remaining items in the batch | |
content_handler._insert_batch() | |
except StopIteration: | |
print("Reached maximum number of articles") | |
except Exception as e: | |
print(f"Error parsing dump: {e}") | |
raise | |
finally: | |
db_conn.commit() | |
db_conn.close() | |
duration = time.time() - start_time | |
print(f"Extracted {content_handler.article_count} articles in {duration:.2f} seconds.") | |
print(f"Data saved to {db_path}") | |
return db_path | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(description='Parse Wikipedia XML dump to SQLite') | |
parser.add_argument('dump_path', help='Path to the Wikipedia XML dump (bz2 file)') | |
parser.add_argument('output_path', help='Path to save the SQLite database') | |
parser.add_argument('--batch-size', type=int, default=1000, | |
help='Batch size for database inserts (default: 1000)') | |
parser.add_argument('--max-articles', type=int, default=None, | |
help='Maximum number of articles to extract (default: all)') | |
args = parser.parse_args() | |
# Create output directory if it doesn't exist | |
output_dir = os.path.dirname(args.output_path) | |
if output_dir: | |
os.makedirs(output_dir, exist_ok=True) | |
# Parse the dump | |
parse_wiki_dump(args.dump_path, args.output_path, args.batch_size, args.max_articles) |