Update mcp/graph_metrics.py
Browse files- mcp/graph_metrics.py +80 -30
mcp/graph_metrics.py
CHANGED
@@ -1,60 +1,110 @@
|
|
1 |
-
|
2 |
-
|
|
|
|
|
|
|
|
|
|
|
3 |
|
4 |
-
|
5 |
-
|
6 |
-
|
|
|
7 |
|
8 |
-
|
9 |
-
|
10 |
|
11 |
-
|
12 |
-
|
|
|
|
|
13 |
"""
|
14 |
|
15 |
-
from
|
|
|
|
|
16 |
import networkx as nx
|
17 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
18 |
|
19 |
-
# ----------------------------------------------------------------------
|
20 |
def _edge_endpoints(e: Dict) -> Tuple[str, str] | None:
|
21 |
-
"""Return (src, dst) if both ends exist;
|
22 |
-
src = e.get("source") or e.get("from")
|
23 |
-
dst = e.get("target") or e.get("to")
|
24 |
-
if src and dst:
|
25 |
-
return src, dst
|
26 |
return None
|
27 |
|
|
|
|
|
|
|
28 |
|
29 |
def build_nx(nodes: List[Dict], edges: List[Dict]) -> nx.Graph:
|
30 |
-
"""
|
31 |
-
|
|
|
|
|
|
|
|
|
|
|
32 |
|
33 |
-
|
34 |
-
|
|
|
35 |
"""
|
36 |
G = nx.Graph()
|
37 |
|
38 |
-
#
|
39 |
for n in nodes:
|
40 |
-
|
|
|
|
|
41 |
|
42 |
-
#
|
43 |
for e in edges:
|
44 |
endpoints = _edge_endpoints(e)
|
45 |
-
if endpoints:
|
46 |
-
|
|
|
|
|
|
|
47 |
|
48 |
return G
|
49 |
|
|
|
|
|
|
|
50 |
|
51 |
-
# ----------------------------------------------------------------------
|
52 |
def get_top_hubs(G: nx.Graph, k: int = 5) -> List[Tuple[str, float]]:
|
53 |
-
"""
|
54 |
dc = nx.degree_centrality(G)
|
55 |
-
return sorted(dc.items(), key=lambda
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
56 |
|
57 |
|
58 |
def get_density(G: nx.Graph) -> float:
|
59 |
-
"""
|
60 |
-
return nx.density(G)
|
|
|
1 |
+
#!/usr/bin/env python3
|
2 |
+
"""MedGenesis – NetworkX helpers (robust version)
|
3 |
+
|
4 |
+
Key upgrades over the legacy helper:
|
5 |
+
|
6 |
+
1. **Edge‑key flexibility** – `build_nx` now recognises *four* common
|
7 |
+
schemas produced by Streamlit‑agraph, PyVis, Neo4j exports or OT graphs:
|
8 |
|
9 |
+
• `{"source": "n1", "target": "n2"}` (agraph)
|
10 |
+
• `{"from": "n1", "to": "n2"}` (PyVis)
|
11 |
+
• `{"src": "n1", "dst": "n2"}` (neo4j/json)
|
12 |
+
• `{"u": "n1", "v": "n2"}` (NetworkX native)
|
13 |
|
14 |
+
2. **Weight aware** – optional numeric `weight` (or `value`) field becomes
|
15 |
+
an edge attribute (defaults to 1).
|
16 |
|
17 |
+
3. **Self‑loop skip** – ignores self‑edges to keep density sensible.
|
18 |
+
|
19 |
+
4. **Utility metrics** – adds `betweenness` & `clustering` helpers in
|
20 |
+
addition to top‑hub degree ranking.
|
21 |
"""
|
22 |
|
23 |
+
from __future__ import annotations
|
24 |
+
|
25 |
+
from typing import Dict, List, Tuple
|
26 |
import networkx as nx
|
27 |
|
28 |
+
__all__ = [
|
29 |
+
"build_nx",
|
30 |
+
"get_top_hubs",
|
31 |
+
"get_density",
|
32 |
+
"get_betweenness",
|
33 |
+
"get_clustering_coeff",
|
34 |
+
]
|
35 |
+
|
36 |
+
# ---------------------------------------------------------------------
|
37 |
+
# Internal helpers
|
38 |
+
# ---------------------------------------------------------------------
|
39 |
|
|
|
40 |
def _edge_endpoints(e: Dict) -> Tuple[str, str] | None:
|
41 |
+
"""Return (src, dst) if both ends exist; else None."""
|
42 |
+
src = e.get("source") or e.get("from") or e.get("src") or e.get("u")
|
43 |
+
dst = e.get("target") or e.get("to") or e.get("dst") or e.get("v")
|
44 |
+
if src and dst and src != dst:
|
45 |
+
return str(src), str(dst)
|
46 |
return None
|
47 |
|
48 |
+
# ---------------------------------------------------------------------
|
49 |
+
# Public API
|
50 |
+
# ---------------------------------------------------------------------
|
51 |
|
52 |
def build_nx(nodes: List[Dict], edges: List[Dict]) -> nx.Graph:
|
53 |
+
"""Convert heterogeneous node/edge dicts into an undirected NetworkX graph.
|
54 |
+
|
55 |
+
Parameters
|
56 |
+
----------
|
57 |
+
nodes : list of node dicts – each must contain an `id` key; other keys
|
58 |
+
are copied as attributes.
|
59 |
+
edges : list of edge dicts – keys can be any of the recognised schemas.
|
60 |
|
61 |
+
Returns
|
62 |
+
-------
|
63 |
+
nx.Graph – ready for downstream centrality / drawing.
|
64 |
"""
|
65 |
G = nx.Graph()
|
66 |
|
67 |
+
# Nodes ----------------------------------------------------------------
|
68 |
for n in nodes:
|
69 |
+
node_id = str(n["id"])
|
70 |
+
attrs = {k: v for k, v in n.items() if k != "id"}
|
71 |
+
G.add_node(node_id, **attrs)
|
72 |
|
73 |
+
# Edges ----------------------------------------------------------------
|
74 |
for e in edges:
|
75 |
endpoints = _edge_endpoints(e)
|
76 |
+
if not endpoints:
|
77 |
+
continue
|
78 |
+
u, v = endpoints
|
79 |
+
w = e.get("weight") or e.get("value") or 1
|
80 |
+
G.add_edge(u, v, weight=float(w))
|
81 |
|
82 |
return G
|
83 |
|
84 |
+
# ---------------------------------------------------------------------
|
85 |
+
# Metrics helpers
|
86 |
+
# ---------------------------------------------------------------------
|
87 |
|
|
|
88 |
def get_top_hubs(G: nx.Graph, k: int = 5) -> List[Tuple[str, float]]:
|
89 |
+
"""Return top‑*k* nodes by **degree centrality**."""
|
90 |
dc = nx.degree_centrality(G)
|
91 |
+
return sorted(dc.items(), key=lambda kv: kv[1], reverse=True)[:k]
|
92 |
+
|
93 |
+
|
94 |
+
def get_betweenness(G: nx.Graph, k: int = 5) -> List[Tuple[str, float]]:
|
95 |
+
"""Top‑*k* nodes by betweenness centrality (approx if |V| > 500)."""
|
96 |
+
if G.number_of_nodes() > 500:
|
97 |
+
bc = nx.betweenness_centrality(G, k=200, seed=42)
|
98 |
+
else:
|
99 |
+
bc = nx.betweenness_centrality(G)
|
100 |
+
return sorted(bc.items(), key=lambda kv: kv[1], reverse=True)[:k]
|
101 |
+
|
102 |
+
|
103 |
+
def get_clustering_coeff(G: nx.Graph) -> float:
|
104 |
+
"""Return average clustering coefficient (0‑1)."""
|
105 |
+
return nx.average_clustering(G)
|
106 |
|
107 |
|
108 |
def get_density(G: nx.Graph) -> float:
|
109 |
+
"""Graph density in [0, 1]."""
|
110 |
+
return nx.density(G)
|