Spaces:
Sleeping
Sleeping
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "altair==5.5.0", | |
# "en-core-web-sm", | |
# "marimo", | |
# "matplotlib==3.10.3", | |
# "numpy==2.2.6", | |
# "pandas==2.3.0", | |
# "pca==2.10.0", | |
# "plotly==6.2.0", | |
# "prince==0.16.0", | |
# "pyarrow", | |
# "scattertext==0.2.2", | |
# "scikit-learn==1.7.0", | |
# "scipy==1.13.1", | |
# "seaborn==0.13.2", | |
# "spacy==3.8.7", | |
# "umap", | |
# ] | |
# [tool.uv.sources] | |
# en-core-web-sm = { url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl" } | |
# /// | |
# Note that the above dependencies should be kept in sync with pyproject.toml | |
import marimo | |
__generated_with = "0.14.10" | |
app = marimo.App(width="full", app_title="Scattertext on English novels") | |
with app.setup: | |
import marimo as mo | |
import spacy | |
import pandas as pd | |
import scipy | |
import numpy as np | |
import random | |
import re | |
import scattertext as st | |
from pca import pca | |
import prince | |
import matplotlib.pyplot as plt | |
from pathlib import Path | |
from types import SimpleNamespace | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
RANDOM_SEED = 42 | |
random.seed(RANDOM_SEED) | |
np.random.seed(RANDOM_SEED) | |
def function_export(): | |
def load_nlp() -> spacy.language.Language: | |
"""Load spaCy English pipeline (tokenizer only).""" | |
return spacy.load("en_core_web_sm", disable=["ner"]) | |
def nlp_docs(texts: list[str], nlp=load_nlp()) -> list[spacy.tokens.Doc]: | |
"""Return spaCy Doc objects for downstream tasks.""" | |
return list(nlp.pipe(texts)) | |
def parse_texts(texts: list[str], nlp=load_nlp()) -> list[str]: | |
"""Tokenize English text via spaCy and emit a whitespace-joined string.""" | |
return [" ".join(tok.text for tok in doc) for doc in nlp.pipe(texts)] | |
def build_corpus_cached( | |
texts: list[str], | |
categories: list[str], | |
) -> st.Corpus: | |
"""Build or reuse cached Scattertext corpus.""" | |
df = pd.DataFrame({"text": texts, "category": categories}) | |
return ( | |
st.CorpusFromPandas( | |
df, | |
category_col="category", | |
text_col="text", | |
nlp=load_nlp(), | |
) | |
.build() | |
.get_unigram_corpus() | |
.compact(st.AssociationCompactor(2000)) | |
) | |
def _strip_advanced(fn: str) -> str: | |
""" | |
Strip trailing '_advanced' from a filename stem. | |
""" | |
from pathlib import Path | |
stem = Path(fn).stem | |
return stem.replace("_advanced", "") | |
def make_short_label(fn: str) -> str: | |
""" | |
Generate an initials-based short label from filename. | |
E.g., 'e_r_eddison-the_worm_ouroboros.txt' -> 'ERE-TWO'. | |
""" | |
stem = _strip_advanced(fn) | |
fields = stem.split("-", 1) | |
if len(fields) == 2: | |
author, title = fields | |
else: | |
author = fields[0] | |
title = fields[0] | |
initials = lambda s: "".join(part[0].upper() for part in s.split("_")) | |
return f"{initials(author)}-{initials(title)}" | |
def format_chunk_label( | |
fn: str, | |
category: str, | |
speech_type: str, | |
chunk_idx: int | str, | |
) -> str: | |
""" | |
Create a chunk label 'SHORTLABEL(CATEGORY[-speech_type])#INDEX'. | |
""" | |
sl = make_short_label(fn) | |
# append speech_type only if it differs from category and isn't 'mixed' | |
if speech_type and speech_type != "mixed" and speech_type != category: | |
label = f"{category}-{speech_type}" | |
else: | |
label = category | |
return f"{sl}({label})#{chunk_idx}" | |
def chunk_texts( | |
df: pd.DataFrame, | |
chunk_size: int = 2000, | |
) -> pd.DataFrame: | |
""" | |
Turn each row of df into token‐chunks of size chunk_size, | |
preserving category, filename, author, work, and producing | |
a `chunk_label`. | |
""" | |
records: list[dict] = [] | |
for _, row in df.iterrows(): | |
tokens = row["text"].split() | |
n_chunks = (len(tokens) + chunk_size - 1) // chunk_size | |
for idx in range(n_chunks): | |
seg = " ".join(tokens[idx * chunk_size : (idx + 1) * chunk_size]) | |
label_idx = idx + 1 if idx + 1 < n_chunks else "last" | |
records.append( | |
{ | |
"text": seg, | |
"category": row["category"], | |
"speech_type": row["speech_type"], | |
"filename": row["filename"], | |
"author": row["author"], | |
"work": row["work"], | |
"chunk_label": format_chunk_label( | |
row["filename"], | |
row["category"], | |
row["speech_type"], | |
label_idx, | |
), | |
} | |
) | |
return pd.DataFrame(records) | |
def train_scikit_cached( | |
texts: list[str], | |
categories: list[str], | |
filenames: list[str], | |
min_df: float = 0.25, | |
max_df: float = 0.8, | |
max_features: int = 200, | |
stop_words: list[str] | None = None, | |
) -> tuple[ | |
st.Corpus, | |
scipy.sparse.spmatrix, | |
TfidfVectorizer, | |
list[str], | |
list[str], | |
]: | |
"""Fit TF-IDF + CountVectorizer & build a st.Corpus on already‐chunked data. | |
stop_words: list of tokens to filter out or None. | |
""" | |
# texts, categories, filenames are assumed already chunked upstream | |
tfv = TfidfVectorizer( | |
min_df=min_df, | |
max_df=max_df, | |
max_features=max_features, | |
stop_words=stop_words, | |
) | |
X_tfidf = tfv.fit_transform(texts) | |
y_codes = pd.Categorical( | |
categories, categories=pd.Categorical(categories).categories | |
).codes | |
scikit_corpus = st.CorpusFromScikit( | |
X=tfv.fit_transform(texts), | |
y=y_codes, | |
feature_vocabulary=tfv.vocabulary_, | |
category_names=list(pd.Categorical(categories).categories), | |
raw_texts=texts, | |
).build() | |
return scikit_corpus, X_tfidf, tfv, categories, filenames | |
def kwic_search( | |
texts: list[str], | |
keyword: str, | |
context_chars: int = 20, | |
) -> pd.DataFrame: | |
""" | |
KWIC on a list of strings. | |
Returns rows with columns: | |
- original_index: index in `texts` | |
- before, keyword, after: context snippets | |
""" | |
import re | |
import pandas as pd | |
pattern = rf"\b{re.escape(keyword)}\b" | |
results: list[dict] = [] | |
for idx, txt in enumerate(texts): | |
txt = str(txt) | |
for m in re.finditer(pattern, txt, re.IGNORECASE): | |
s, e = m.span() | |
results.append( | |
{ | |
"original_index": idx, | |
"before": txt[max(0, s - context_chars) : s], | |
"keyword": txt[s:e], | |
"after": txt[e : min(len(txt), e + context_chars)], | |
} | |
) | |
return pd.DataFrame( | |
results, | |
columns=["original_index", "before", "keyword", "after"], | |
) | |
def split_speech_text(text: str) -> tuple[str, str]: | |
""" | |
Extract all quoted spans as 'speech' and the remainder as 'non-speech' | |
for a single text string. | |
""" | |
rx = re.compile(r"“[^”]+”") | |
rx_multi = re.compile(r"“[^”]+$") | |
spans = [(m.start(), m.end()) for m in rx.finditer(text)] | |
spans += [(m.start(), m.end()) for m in rx_multi.finditer(text)] | |
# collect speech segments | |
speech = [text[s:e] for s, e in spans] | |
# remove speech spans to form non-speech | |
ns_text = text | |
for s, e in sorted(spans, reverse=True): | |
ns_text = ns_text[:s] + ns_text[e:] | |
non_speech = ( | |
[ns_text] if spans and ns_text.strip() else ([text] if not spans else []) | |
) | |
return "\n".join(speech), "\n".join(non_speech) | |
def _load_files(uploaded, defaults): | |
if uploaded: | |
names = [f.name for f in uploaded] | |
texts = [f.contents.decode("utf-8") for f in uploaded] | |
else: | |
names = defaults | |
texts = [Path(fn).read_text(encoding="utf-8") for fn in defaults] | |
return names, texts | |
def prepare_files( | |
uploaded: list, defaults: list[str], split: bool = False | |
) -> pd.DataFrame: | |
""" | |
Ingest uploaded vs. default files into a DataFrame with columns: | |
['filename','raw_text','category' (if split),'author','work']. | |
""" | |
names, raws = _load_files(uploaded, defaults) | |
records: list[dict] = [] | |
for name, raw in zip(names, raws): | |
if split: | |
sp, ns = split_speech_text(raw) | |
records.append( | |
{ | |
"filename": name, | |
"raw_text": sp, | |
"speech_type": "speech", | |
} | |
) | |
records.append( | |
{ | |
"filename": name, | |
"raw_text": ns, | |
"speech_type": "non-speech", | |
} | |
) | |
else: | |
records.append( | |
{ | |
"filename": name, | |
"raw_text": raw, | |
"speech_type": "mixed", | |
} | |
) | |
df_p = pd.DataFrame(records) | |
# infer author & work from the file's true stem (no extension, no "_advanced") | |
def _extract_auth_work(fn: str) -> tuple[str, str]: | |
base = Path(fn).stem.replace("_advanced", "") | |
auth, *rest = base.split("-", 1) | |
work_raw = rest[0] if rest else base | |
return ( | |
auth.replace("_", " ").title(), | |
work_raw.replace("_", " ").title(), | |
) | |
aw = df_p["filename"].apply(_extract_auth_work) | |
df_p["author"], df_p["work"] = zip(*aw) | |
return df_p | |
return ( | |
build_corpus_cached, | |
chunk_texts, | |
kwic_search, | |
parse_texts, | |
prepare_files, | |
train_scikit_cached, | |
) | |
def intro(): | |
mo.md( | |
r""" | |
# Scattertext on English novels from StandardEbooks / StandardEbooksの近代文学作品のScattertext可視化 | |
## 概要 | |
2つの異なるカテゴリのテキストファイル群をアップロードし、その差異をScattertextで可視化します。 | |
オプショナルで機械学習モデルで分類をし、モデルの分類制度とモデルが識別に用いるトークンも確認できます。 | |
> 会話文認識機能はStandardEbooks独自のフォーマットに依存するため、他の資料には対応しないことがあります。 | |
## ワークフロー | |
1. テキストファイルをアップロード(デフォルトを使う場合はそのままSubmitしてください) | |
2. データ内容を確認・修正 | |
3. チャンク&サンプリング設定 | |
4. Scattertextによる可視化 | |
5. PCAとCAのbiplot、階層的クラスタリングのデンドログラムでサンプル、カテゴリと素性の分布と関係を観察 | |
6. 気になるサンプルをドロップダウンで選択し、内容を確認 | |
> 単語分割には、[spaCy](https://spacy.io/)([en_core_web_sm](https://spacy.io/models/en#en_core_web_sm)モデル)を使用しています。 | |
""" | |
) | |
return | |
def data_settings(): | |
category_name = mo.ui.text( | |
label="カテゴリ名(例:著者名・時代区分など)", | |
placeholder="例:時代・性別・著者など", | |
value="著者", | |
full_width=True, | |
) | |
label_a = mo.ui.text( | |
label="Aのラベル(作者)", | |
placeholder="自動推論 (e.g. E R Eddison)", | |
value="E R Eddison", | |
full_width=True, | |
) | |
files_a = mo.ui.file( | |
label="Aのファイルアップロード(UTF-8、.txt形式)", | |
multiple=True, | |
kind="area", | |
) | |
### Category form | |
label_b = mo.ui.text( | |
label="Bのラベル(作者)", | |
placeholder="自動推論 (e.g. H G Wells)", | |
value="H G Wells", | |
full_width=True, | |
) | |
files_b = mo.ui.file( | |
label="Bのファイルアップロード(UTF-8、.txt形式)", | |
multiple=True, | |
kind="area", | |
) | |
split_speech = mo.ui.switch( | |
label="Split speech vs non-speech segments?", | |
value=True, | |
) | |
author_tpl = r""" | |
## Category Comparisonモード | |
※ ファイルはプレインテキスト形式必須(.txt, UTF-8エンコーディング) | |
※ ファイル名形式: `author_name-title_text.txt` | |
{category_name} | |
### グループA | |
{label_a} | |
{files_a} | |
### グループB | |
{label_b} | |
{files_b} | |
{split_speech} | |
""" | |
category_form = ( | |
mo.md(author_tpl) | |
.batch( | |
category_name=category_name, | |
label_a=label_a, | |
files_a=files_a, | |
label_b=label_b, | |
files_b=files_b, | |
split_speech=split_speech, | |
) | |
.form(show_clear_button=True, bordered=True) | |
) | |
### Speech vs Non-Speech form | |
speech_files = mo.ui.file( | |
label="Speechモード用ファイルアップロード(UTF-8、.txt形式)", | |
multiple=True, | |
kind="area", | |
) | |
speech_tpl = r""" | |
## Speech vs Non-Speechモード | |
※ ファイルはプレインテキスト形式必須(.txt, UTF-8エンコーディング) | |
※ ファイル名形式: `author_name-title_text.txt` | |
{files_s} | |
""" | |
speech_form = ( | |
mo.md(speech_tpl) | |
.batch(files_s=speech_files) | |
.form(show_clear_button=True, bordered=True) | |
) | |
mode_tabs = mo.ui.tabs( | |
{ | |
"Speech vs Non-Speech": speech_form, | |
"Category Comparison": category_form, | |
} | |
) | |
mode_tabs | |
return category_form, mode_tabs, speech_form, split_speech | |
def data_check( | |
category_form, | |
mode_tabs, | |
parse_texts, | |
prepare_files, | |
speech_form, | |
split_speech, | |
): | |
mo.stop(mode_tabs.value == "Speech vs Non-Speech" and speech_form.value is None) | |
mo.stop(mode_tabs.value == "Category Comparison" and category_form.value is None) | |
validation_messages: list[str] = [] | |
if mode_tabs.value == "Speech vs Non-Speech": | |
defaults = [ | |
"e_r_eddison-the_worm_ouroboros_advanced.txt", | |
"h_g_wells-the_wonderful_visit_advanced.txt", | |
] | |
df_pre = prepare_files( | |
speech_form.value.get("files_s", []), | |
defaults, | |
split=True, | |
) | |
data = df_pre.rename(columns={"raw_text": "text"}) | |
# use the speech‐vs‐non‐speech flag as our category | |
data["category"] = data["speech_type"] | |
mo.md( | |
f"## Data preview (speech vs non-speech)\n" | |
f"{mo.ui.table(data, selection=None)}" | |
) | |
data_form = SimpleNamespace( | |
value={ | |
"category_name": "Speech vs Non-speech", | |
"label_a": "speech", | |
"label_b": "non-speech", | |
} | |
) | |
elif category_form.value is not None and mode_tabs.value == "Category Comparison": | |
# Category vs Category | |
if category_form.value["label_a"] == category_form.value["label_b"]: | |
validation_messages.append( | |
"⚠️ **警告**: グループAとBのラベルが同じです。AとBは異なるラベルを設定してください。\n" | |
) | |
if not category_form.value["files_a"] and not category_form.value["files_b"]: | |
validation_messages.append( | |
"ℹ️ ファイルが未指定のため、デフォルトサンプルを使用しています。\n" | |
) | |
defaults_a = ["e_r_eddison-the_worm_ouroboros_advanced.txt"] | |
df_a = prepare_files( | |
category_form.value["files_a"], | |
defaults_a, | |
split=split_speech.value, | |
) | |
df_a["category"] = ( | |
[category_form.value["label_a"]] * len(df_a) | |
if category_form.value["files_a"] | |
else [category_form.value["label_a"]] * len(df_a) | |
) | |
defaults_b = ["h_g_wells-the_wonderful_visit_advanced.txt"] | |
df_b = prepare_files( | |
category_form.value["files_b"], | |
defaults_b, | |
split=split_speech.value, | |
) | |
df_b["category"] = [category_form.value["label_b"]] * len(df_b) | |
data = pd.concat([df_a, df_b], ignore_index=True) | |
# tokenize text if not already (optional) | |
data["text"] = parse_texts(list(data["raw_text"])) | |
data_form = category_form | |
else: | |
data = None | |
validation_messages.append( | |
f"❌ **エラー**: {mode_tabs.value}: {category_form.value}, {speech_form.value}\n" | |
) | |
data_form = None | |
mo.md(f""" | |
## データ確認 | |
{"**警告**:\n" if validation_messages else ""} | |
{"\n".join(map(lambda x: f"- {x}", validation_messages))} | |
解析済テキスト一覧: | |
{ | |
mo.ui.table( | |
data, | |
selection=None, | |
format_mapping={"text": lambda s: s[:20] + "..."}, | |
) | |
if (data is not None and not data.empty) | |
else "No data" | |
} | |
""") | |
return data, data_form | |
def sampling_controls_setup(): | |
chunk_size = mo.ui.slider( | |
start=500, | |
stop=50_000, | |
value=2000, | |
step=500, | |
label="1チャンクあたり最大トークン数", | |
full_width=True, | |
) | |
sample_frac = mo.ui.slider( | |
start=0.1, | |
stop=1.0, | |
value=0.2, | |
step=0.05, | |
label="使用割合(1.0で全データ)", | |
full_width=True, | |
) | |
sampling_form = ( | |
mo.md("{chunk_size}\n{sample_frac}") | |
.batch(chunk_size=chunk_size, sample_frac=sample_frac) | |
.form(show_clear_button=True, bordered=False) | |
) | |
sampling_form | |
return chunk_size, sample_frac, sampling_form | |
def _(build_corpus_cached, chunk_texts, data, sample_frac, sampling_form): | |
mo.stop(sampling_form.value is None) | |
with mo.status.spinner("コーパスをサンプリング中…"): | |
# chunk the DataFrame | |
chunk_df = chunk_texts(data, sampling_form.value["chunk_size"]) | |
# optional subsampling | |
if sample_frac.value < 1.0: | |
chunk_df = chunk_df.sample(frac=sample_frac.value, random_state=RANDOM_SEED) | |
texts = chunk_df["text"].tolist() | |
cats = chunk_df["category"].tolist() | |
fnames = chunk_df["chunk_label"].tolist() | |
authors = chunk_df["author"].tolist() | |
works = chunk_df["work"].tolist() | |
speech_types = chunk_df["speech_type"].tolist() | |
corpus = build_corpus_cached(texts, cats) | |
return authors, cats, corpus, fnames, speech_types, texts, works | |
def sampling_controls(chunk_size): | |
mo.md("トークン数を増やすと処理時間が長くなります").callout( | |
kind="info" | |
) if chunk_size.value > 30_000 else None | |
return | |
def plot_main_scatterplot(corpus, data_form, fnames): | |
cat_name = data_form.value["category_name"] | |
with mo.status.spinner("Scatterplot作成中…"): | |
html = st.produce_scattertext_explorer( | |
corpus, | |
category=data_form.value["label_a"], | |
category_name=f"{cat_name}: {data_form.value['label_a']}", | |
not_category_name=f"{cat_name}: {data_form.value['label_b']}", | |
width_in_pixels=1000, | |
metadata=fnames, | |
) | |
mo.vstack( | |
[ | |
mo.md(f""" | |
# Scattertextの結果 | |
### Scattertext可視化の見方 | |
- (縦)上に行くほど{data_form.value["label_a"]}で相対的に多く使われるトークン | |
- (横)右に行くほど{data_form.value["label_b"]}で相対的に多く使われるトークン | |
HTMLをダウンロードしてブラウザで開くと見やすい | |
"""), | |
mo.iframe(html), | |
] | |
) | |
return (html,) | |
def _(html): | |
download_button = mo.download( | |
data=html.encode(), | |
filename="scattertext_analysis.html", | |
label="ScatterText可視化結果をダウンロード", | |
) | |
mo.md(f"{download_button}") | |
return | |
def _(): | |
mo.md( | |
r""" | |
# 探索的検証 | |
クラスター分析のデンドログラムと主成分分析(biplot)による探索的検証を行います。 | |
Biplotでは各テキストが丸点で、各素性が矢印で同じプロットで示されています。 | |
矢印の色が赤の場合、その素性の負荷量絶対値が高く、色が青いの場合は、どの主成分で高くないという意味になります。 | |
""" | |
) | |
return | |
def _(): | |
min_df_setting = mo.ui.slider( | |
start=0.0, | |
stop=1.0, | |
step=0.05, | |
value=0.25, | |
show_value=True, | |
include_input=True, | |
label="Minimum proportion of samples feature appears in", | |
) | |
max_df_setting = mo.ui.slider( | |
start=0.0, | |
stop=1.0, | |
step=0.05, | |
value=0.8, | |
show_value=True, | |
include_input=True, | |
label="Maximum proportion of samples feature appears in", | |
) | |
max_features_setting = mo.ui.slider( | |
start=10, | |
stop=10_000, | |
step=1, | |
value=100, | |
show_value=True, | |
include_input=True, | |
label="Maximum number of features to use", | |
) | |
mo.vstack( | |
[ | |
mo.md( | |
"### 素性設定\n\nどのような単語を分析に使用するかを下記のスライダーで決めます。標準では、ほとんど全ての文章に現る単語、または極端に少ない文章にしか現れない単語が除外されています。そのうえで、$\\mathrm{tfidf}$の値上位100件まで素性としています。" | |
), | |
min_df_setting, | |
max_df_setting, | |
max_features_setting, | |
] | |
) | |
return max_df_setting, max_features_setting, min_df_setting | |
def _(max_df_setting, min_df_setting): | |
min_max_check = None | |
if max_df_setting.value <= min_df_setting.value: | |
min_max_check = mo.md(f"**Error**: minimum value {min_df_setting.value} must be smaller then maximum value {max_df_setting.value}.\n\nChange the sliders so that the min is smaller than the max.").callout(kind="danger") | |
min_max_check | |
return (min_max_check,) | |
def stopword_switch(): | |
stop_filter = mo.ui.switch(label="Enable stop-word filtering?", value=False) | |
stop_filter | |
return (stop_filter,) | |
def stopword_source(stop_filter): | |
if stop_filter.value: | |
sw_source = mo.ui.dropdown( | |
options=["spaCy", "Custom", "Both"], | |
value="spaCy", | |
label="Stop-word source", | |
full_width=True, | |
) | |
else: | |
sw_source = None | |
sw_source | |
return (sw_source,) | |
def custom_stopword_editor(sw_source): | |
if sw_source and sw_source.value in ("Custom", "Both"): | |
empty = pd.DataFrame({"stopword": []}, dtype=pd.StringDtype()) | |
editor = mo.ui.data_editor(empty).form( | |
label="Your custom stop-words", bordered=True | |
) | |
else: | |
editor = None | |
editor | |
return (editor,) | |
def final_stopwords(editor, stop_filter, sw_source): | |
# if master switch off → no filtering | |
if stop_filter.value: | |
# require a source choice | |
mo.stop(sw_source is None, mo.md("Choose stop-word source")) | |
sw: set[str] = set() | |
if sw_source.value in ("spaCy", "Both"): | |
from spacy.lang.en.stop_words import STOP_WORDS | |
sw.update(STOP_WORDS) | |
if sw_source.value in ("Custom", "Both"): | |
mo.stop( | |
editor is None or editor.value is None, | |
mo.md("Enter at least one custom stop-word"), | |
) | |
for tok in editor.value["stopword"].dropna().astype(str): | |
tok = tok.strip() | |
if tok: | |
sw.add(tok) | |
sw = list(sw) | |
else: | |
sw = None | |
return (sw,) | |
def _( | |
cats, | |
fnames, | |
max_df_setting, | |
max_features_setting, | |
min_df_setting, | |
min_max_check, | |
sw: set[str], | |
texts, | |
train_scikit_cached, | |
): | |
mo.stop(min_max_check is not None) | |
scikit_corpus, tfidf_X, vectorizer, chunk_cats, chunk_fnames = train_scikit_cached( | |
texts, | |
cats, | |
fnames, | |
min_df=min_df_setting.value, | |
max_df=max_df_setting.value, | |
max_features=max_features_setting.value, | |
stop_words=sw, | |
) | |
return chunk_cats, chunk_fnames, tfidf_X, vectorizer | |
def _(chunk_cats, tfidf_X): | |
# from sklearn.model_selection import train_test_split | |
# X_train, X_test, y_train, y_test = train_test_split( | |
# tfidf_X, | |
# chunk_cats, | |
# test_size=None, | |
# random_state=RANDOM_SEED, | |
# ) | |
X_train, X_test, y_train, y_test = tfidf_X, chunk_cats, [], [] | |
return (X_train,) | |
def _(X_train, chunk_fnames, texts, vectorizer): | |
tf_idf_formula = r"$\mathrm{tfidf}(t,d,D)=\mathrm{tf} (t,d)\cdot \mathrm{idf}(t,D)$" | |
D_formula = r"|\{d:d\in D{\text{ and }}t\in d\}|" | |
idf_formula = rf"$\mathrm{{idf}}(t,D)=\log{{\frac{{N}}{{{D_formula}}}}}$" | |
tf_formula = r"${\displaystyle \mathrm {tf} (t,d)=\textrm{number of times }t\textrm{ appears in }d}$" | |
X_df = pd.DataFrame( | |
X_train.toarray(), | |
index=chunk_fnames, | |
columns=vectorizer.get_feature_names_out(), | |
) | |
mo.md(rf""" | |
### サンプルと素性の行列 | |
各セルには、そのテキスト(行)に出現する素性(=単語)(列)の$\mathrm{{tfidf}}$の値です。 | |
$\mathrm{{tfidf}}$が高いほど、その単語の重要度が高いという意味になります。 | |
単語が多くの文章に出現する場合は、低い値になります。 | |
{tf_idf_formula} | |
{idf_formula} | |
{tf_formula} | |
- ${{\displaystyle D}}$: is the set of all documents in the corpus | |
- ${{\displaystyle N}}$: total number of documents in the corpus ${{\displaystyle N={{|D|}}}}$ | |
- ${D_formula}$: number of documents with $t$ | |
{mo.ui.table(X_df, selection=None)} | |
""") | |
# build raw‐counts table on identical vocab | |
from sklearn.feature_extraction.text import CountVectorizer | |
cv = CountVectorizer(vocabulary=vectorizer.vocabulary_) | |
count_mat = cv.fit_transform(texts) | |
count_df = pd.DataFrame( | |
count_mat.toarray(), | |
index=chunk_fnames, | |
columns=vectorizer.get_feature_names_out(), | |
) | |
return X_df, count_df | |
def pca_biplot(chunk_cats, tfidf_X, vectorizer): | |
X = tfidf_X.toarray() if hasattr(tfidf_X, "toarray") else tfidf_X | |
feature_names = vectorizer.get_feature_names_out() | |
model = pca(normalize=False, n_components=3) | |
results = model.fit_transform( | |
X, | |
col_labels=feature_names, | |
row_labels=chunk_cats, | |
) | |
three_switch = mo.ui.switch(label="3D") | |
three_switch | |
return X, model, results, three_switch | |
def _(model, results, three_switch): | |
model.biplot( | |
legend=True, | |
figsize=(12, 8), | |
fontsize=12, | |
s=20, | |
arrowdict={"alpha": 0.0}, | |
PC=[0, 1, 2] if three_switch.value else [0, 1], | |
) | |
# labels=np.array(chunk_fnames) | |
topfeat = results["topfeat"] | |
mo.vstack( | |
[ | |
mo.md( | |
r"""## Principal Components Analysis / 主成分分析 | |
[Principal Components Analysis](https://erdogant.github.io/pca/pages/html/index.html) (PCA)は、$\mathrm{{tfidf}}$スコアを連続的な数値データとして扱い、データセット内の分散を最も多く説明する単語の線形結合を特定します。この分析により、以下の点が明らかになります。 | |
- 主成分によって会話文と地の文(あるいは他の分析カテゴリ)を最も効果的に区別する単語の組み合わせが判明します。 | |
- 会話文と地の文サンプル間の分散に最も寄与する共起語彙パターン、および判別力の高い語彙が特定されます。 | |
- PCAは傾度に沿った線形関係を仮定するため、言語スタイルの緩やかな変化も示されます。 | |
- $\mathrm{{tfidf}}$スコアの連続性を保持したまま、次元削減が実現されます。 | |
**主成分とは?** | |
主成分は「データのばらつきを一番よく説明する単語の線形結合」です。 | |
数式よりも「語彙の座標軸」と捉えてください。 | |
""" | |
), | |
mo.mpl.interactive(plt.gcf()), | |
topfeat, | |
] | |
) | |
return | |
def _(): | |
mo.md( | |
r""" | |
## Correspondence Analysis / 対応分析 | |
対応分析(CA)のbiplotでは、主成分分析のbiplotと似ているような分析として、サンプルと素性の関係が観察できますが、いくつかの違いがあります。 | |
対応分析を行うには、$\mathrm{tfidf}$行列ではなく粗頻度行列をカテゴリカルな形式の分割表(contingency table)に変換する必要があります。次に、そのデータを連関表として解析します。この手法により、 | |
- 会話文と地の文カテゴリと特定単語出現パターンとの関連性を検討 | |
- サンプルのカテゴリと単語特徴量との離散的な関連として関係性を示すバイプロットを作成 | |
- 各カテゴリに最も特徴的な単語を、PCAでのユークリッド距離ではなくカイ二乗距離を用いて抽出 | |
- サンプルと単語の両方をランダムな観測値として対称的に扱うことができる | |
といった分析が可能となります。 | |
**CAの出力の読み取り方** | |
行(サンプル)と列(単語)が近いほど、その単語がそのサンプル群に特徴的です。 | |
プロット上で原点に近い点は「どのカテゴリにも偏らない語」です。 | |
""" | |
) | |
return | |
def _(X_df, authors, chunk_cats, speech_types, works): | |
import itertools | |
# Build a small DF to test each dim‐combo | |
df_chk = X_df.copy() | |
df_chk["author"] = authors | |
df_chk["category"] = chunk_cats | |
df_chk["work"] = works | |
df_chk["speech_type"] = speech_types | |
# filter out collinear dimensions by Cramér’s V | |
from scipy.stats import chi2_contingency | |
def cramers_v(m: np.ndarray) -> float: | |
"""Compute Cramér’s V from a contingency‐matrix.""" | |
chi2 = chi2_contingency(m, correction=False)[0] | |
n = m.sum() | |
k = min(m.shape) - 1 | |
return np.sqrt(chi2 / (n * k)) | |
cols = ["author", "category", "work", "speech_type"] | |
vmat = pd.DataFrame(index=cols, columns=cols, dtype=float) | |
for i in cols: | |
for j in cols: | |
if i == j: | |
vmat.loc[i, j] = 1.0 | |
else: | |
m = pd.crosstab(df_chk[i], df_chk[j]).values | |
vmat.loc[i, j] = cramers_v(m) | |
print(vmat) | |
# drop any dimension that is nearly collinear with another (V > .95) | |
high_thresh = 0.95 | |
# only drop the later dimension in each tuple | |
drop = { | |
j for i, j in itertools.combinations(cols, 2) if vmat.loc[i, j] > high_thresh | |
} | |
# special‐case: in pure speech vs non-speech mode (category == speech_type), | |
# keep speech_type (the more descriptive) and drop category instead | |
if vmat.loc["category", "speech_type"] > high_thresh and chunk_cats == speech_types: | |
drop.discard("speech_type") | |
drop.add("category") | |
filtered_dims = [d for d in cols if d not in drop] | |
print(drop, filtered_dims) | |
# warn on moderate association .3 ≤ V ≤ .6 | |
collinear_warns = [] | |
for i in cols: | |
for j in cols: | |
if i < j and 0.3 <= vmat.loc[i, j] <= 0.6: | |
collinear_warns.append( | |
f"⚠️ `{i}` vs `{j}` moderate association (V={vmat.loc[i, j]:.2f})" | |
) | |
collinear_message = mo.md("## Warning\n" + "\n".join(collinear_warns)).callout( | |
kind="warning" | |
) | |
dims_all = filtered_dims # start with our filtered labels | |
options: list[str] = [] | |
# Enumerate all non-empty combinations; keep those yielding >2 groups | |
for r in range(1, len(dims_all) + 1): | |
for combo in itertools.combinations(dims_all, r): | |
if df_chk.groupby(list(combo)).ngroups > 2: | |
options.append("|".join(combo)) | |
mo.stop( | |
not options, | |
mo.md( | |
f"No category combination yielding more than two rows, so cannot perform CA.\n{collinear_message}" | |
), | |
) | |
ca_group_by = mo.ui.dropdown( | |
options=options, | |
value=options[0], | |
label="Group by (dims that yield >2 rows)", | |
full_width=True, | |
) | |
ca_group_by | |
return (ca_group_by,) | |
def _(authors, ca_group_by, chunk_cats, count_df, speech_types, works): | |
df = count_df.copy() | |
df["author"] = authors | |
df["category"] = chunk_cats | |
df["work"] = works | |
df["speech_type"] = speech_types | |
# split "author|work" (etc.) into the actual list of dims | |
dims = ca_group_by.value.split("|") | |
# sum only numeric (feature) columns by group | |
num_cols = df.select_dtypes(include="number").columns.tolist() | |
ct = df.groupby(dims)[num_cols].sum() | |
# flatten MultiIndex into a single‐level index | |
if len(dims) > 1: | |
ct.index = ["|".join(idx) for idx in ct.index] | |
else: | |
ct.index = ct.index.astype(str) | |
mo.md(f""" | |
### カテゴリと素性の行列 | |
{mo.ui.table(ct, selection=None)} | |
""") | |
return (ct,) | |
def _(ct): | |
ca_model = prince.CA( | |
n_components=2, | |
n_iter=10, | |
copy=True, | |
check_input=True, | |
engine="sklearn", | |
random_state=RANDOM_SEED, | |
) | |
ca_model = ca_model.fit(ct) | |
ca_model.plot( | |
ct, | |
x_component=0, | |
y_component=1, | |
show_row_markers=True, | |
show_column_markers=True, | |
show_row_labels=True, | |
show_column_labels=True, | |
) | |
return | |
def _(): | |
linkage_methods = mo.ui.dropdown( | |
options=[ | |
"ward", | |
"single", | |
"complete", | |
"average", | |
], | |
value="ward", | |
label="Linkage Method", | |
) | |
distance_metrics = mo.ui.dropdown( | |
options=["cosine", "euclidean", "cityblock", "hamming"], | |
value="cosine", | |
label="Distance Metric", | |
) | |
dendrogram_height = mo.ui.number( | |
label="Dendrogram plot height (increase if hard to see labels)", | |
start=800, | |
value=1200, | |
step=100, | |
) | |
d_stack = mo.hstack([linkage_methods, distance_metrics], justify="start") | |
mo.md(f""" | |
## Hierarchical Clustering / 階層的クラスタリング | |
階層的クラスタリングは、(予め設定したカテゴリに関わらず)サンプル間の$\\mathrm{{tfidf}}$単語使用パターンの類似性に基づき、直接的にグループ化を行います。 | |
- サンプル同士が異なる類似度レベルでどのようにグループ化されるかを示す樹状図(デンドログラム)を生成 | |
- サンプル間の距離計算において、定めた全ての$\\mathrm{{tfidf}}$特徴量を保持 | |
- PCA/CAと比べ、特徴量間の関係ではなく、サンプル間の関係性に着目(ただし、行列を回転し、逆の分析もできる) | |
- 高次元$\\mathrm{{tfidf}}$ベクトル間の類似度を測定するために、ユークリッド距離やコサイン距離といった距離尺度を用いる | |
- 類似した単語使用パターンを有するサンプル群の離散的なクラスタを構築 | |
{d_stack} | |
{dendrogram_height} | |
""") | |
return dendrogram_height, distance_metrics, linkage_methods | |
def _(X, chunk_fnames, dendrogram_height, distance_metrics, linkage_methods): | |
import plotly.figure_factory as ff | |
import scipy.spatial.distance as ssd | |
import scipy.cluster.hierarchy as sch | |
distfun = lambda M: ssd.pdist(M, metric=distance_metrics.value) | |
linkagefun = lambda D: sch.linkage(D, method=linkage_methods.value) | |
fig = ff.create_dendrogram( | |
X, | |
orientation="left", | |
labels=list(chunk_fnames), | |
distfun=distfun, | |
linkagefun=linkagefun, | |
) | |
fig.update_layout( | |
width=800, | |
height=dendrogram_height.value, | |
title=f"Dendrogram using {linkage_methods.value} link method and {distance_metrics.value} distance on samples", | |
) | |
mo.ui.plotly(fig) | |
return distfun, ff, linkagefun | |
def _( | |
X, | |
X_df, | |
dendrogram_height, | |
distance_metrics, | |
distfun, | |
ff, | |
linkage_methods, | |
linkagefun, | |
): | |
fig_T = ff.create_dendrogram( | |
X.T, | |
orientation="left", | |
labels=X_df.columns, | |
distfun=distfun, | |
linkagefun=linkagefun, | |
) | |
fig_T.update_layout( | |
width=800, | |
height=dendrogram_height.value, | |
title=f"Dendrogram using {linkage_methods.value} link method and {distance_metrics.value} distance on features", | |
) | |
mo.ui.plotly(fig_T) | |
return | |
def sample_selector(fnames): | |
selector_explanation = mo.md( | |
"## データの確認\n\n### サンプルの確認\n\n以下の選択肢から任意のサンプルを選ぶとその中身が確認できます。" | |
) | |
text_selector = mo.ui.dropdown( | |
options=list(sorted(fnames)), | |
value=fnames[0] if fnames else None, | |
label="Select a sample to view", | |
) | |
mo.vstack([selector_explanation, text_selector]) | |
return (text_selector,) | |
def sample_viewer(fnames, text_selector, texts): | |
mo.stop(not text_selector.value, "No sample selected.") | |
selected_idx = fnames.index(text_selector.value) | |
mo.md(f"**{text_selector.value}**\n\n{texts[selected_idx]}") | |
return | |
def _(): | |
kwic_explanation = mo.md( | |
"### KWIC検索\n\nKeyWord In Context (KWIC)は検索語の左右コンテクストを効率的に確認できる可視化方法です。" | |
) | |
keyword = mo.ui.text(label="Search keyword") | |
context_chars = mo.ui.number(label="Context chars", start=0, value=50) | |
run_btn = mo.ui.run_button(label="Search") | |
mo.vstack([kwic_explanation, keyword, context_chars, run_btn]) | |
return context_chars, keyword, run_btn | |
def _( | |
authors, | |
context_chars, | |
keyword, | |
kwic_search, | |
run_btn, | |
speech_types, | |
texts, | |
works, | |
): | |
mo.stop(not run_btn.value, mo.md("Type a keyword and click Search.")) | |
kwic_df = kwic_search(texts, keyword.value, context_chars.value) | |
if kwic_df.empty: | |
kwic_display = mo.md(f"No occurrences of “{keyword.value}” found.") | |
else: | |
# reattach metadata | |
meta = pd.DataFrame( | |
{ | |
"sample_index": range(len(texts)), | |
"author": authors, | |
"work": works, | |
"speech_type": speech_types, | |
} | |
) | |
merged = kwic_df.merge( | |
meta, | |
left_on="original_index", | |
right_on="sample_index", | |
validate="many_to_one", | |
).drop(columns=["original_index", "sample_index"]) | |
kwic_display = mo.ui.table(merged, selection=None) | |
kwic_display | |
return | |
def _(): | |
mo.md( | |
r""" | |
# まとめ | |
これ3つのアプローチをすべて用いることで、異なる視点を得ることができます: | |
- **階層的クラスタリング**: データ内の"自然な"グループ分けを明らかにします。例えば、特定の著者の話し方のパターンが一緒にクラスタ化されたり、叙述部分と会話部分が明確に異なるグループを形成したりすることが考えられます。 | |
- **対応分析**: カテゴリ間の関連性を明らかにします。例えば、異なる著者や発話タイプに最も特徴的な単語がどれであるかを調べることができます。 | |
- **主成分分析**: 最も識別力の高い単語の組み合わせを特定します。例えば、どの語彙パターンが会話文/地の文や著者間の区別に最も寄与しているかを示すことができます。 | |
""" | |
) | |
return | |
def _(): | |
return | |
if __name__ == "__main__": | |
app.run() | |