DRLCogNet / doku.md
Rkruemmel's picture
Upload 5 files
6481b89 verified
# Code-Dokumentation
## Einführung
Dieses Dokument beschreibt den Code zur Verarbeitung und Simulation von Daten aus einer CSV-Datei, die in einem neuronalen Netzwerk verwendet werden. Der Code umfasst Funktionen zur Initialisierung des Netzwerks, zur Verarbeitung der CSV-Datei, zur Simulation des Lernprozesses und zur Speicherung und Laden des Modells.
## Abhängigkeiten
- `pandas`: Zur Verarbeitung von CSV-Dateien.
- `numpy`: Für numerische Operationen.
- `random`: Für zufällige Operationen.
- `tqdm`: Für Fortschrittsanzeigen.
- `tkinter`: Für die grafische Benutzeroberfläche.
- `seaborn`: Für die Visualisierung.
- `networkx`: Für die Erstellung und Analyse von Graphen.
- `json`: Für die Speicherung und das Laden von Modellen.
- `os`: Für Dateioperationen.
- `time`: Für Zeitmessungen.
- `torch`: Für neuronale Netzwerke.
- `threading`: Für die Verwaltung von Threads.
- `logging`: Für die Protokollierung.
- `sqlite3`: Für die Verwendung von SQLite-Datenbanken.
- `dask.dataframe`: Für die parallele Verarbeitung von Daten.
## Konfiguration des Loggers
```python
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
```
## Globale Variablen
- `initialized`: Überprüft, ob das Netzwerk initialisiert wurde.
- `category_nodes`: Liste der Knoten im Netzwerk.
- `questions`: Liste der Fragen.
- `model_saved`: Schutzvariable, um zu überprüfen, ob das Modell gespeichert wurde.
## Überprüfen, ob der Ordner existiert
```python
output_dir = "plots"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
```
## Funktionen
### Funktion zum Aufteilen der CSV-Datei
```python
def split_csv(filename, chunk_size=1000, output_dir="data"):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
chunk_iter = pd.read_csv(filename, chunksize=chunk_size)
for i, chunk in enumerate(chunk_iter):
chunk.to_csv(os.path.join(output_dir, f"data_part_{i}.csv"), index=False)
logging.info(f"Chunk {i} mit {len(chunk)} Zeilen gespeichert.")
```
### Verbesserung 1: Verstärkung der Verbindungen bei häufig gestellten Fragen
```python
def strengthen_question_connection(category_nodes, question, category):
category_node = next((node for node in category_nodes if node.label == category), None)
if category_node:
for conn in category_node.connections:
if conn.target_node.label == question:
old_weight = conn.weight
conn.weight += 0.1 # Verstärkung der Verbindung
conn.weight = np.clip(conn.weight, 0, 1.0)
logging.info(f"Verstärkte Verbindung für Frage '{question}' in Kategorie '{category}': {old_weight:.4f} -> {conn.weight:.4f}")
```
### Verbesserung 2: Erweiterte Hebb'sche Lernregel zur besseren Zuordnung von Fragen
```python
def enhanced_hebbian_learning(node, target_node, learning_rate=0.2, decay_factor=0.01):
old_weight = None
for conn in node.connections:
if conn.target_node == target_node:
old_weight = conn.weight
conn.weight += learning_rate * node.activation * target_node.activation
conn.weight = np.clip(conn.weight - decay_factor * conn.weight, 0, 1.0)
break
if old_weight is not None:
logging.info(f"Hebb'sches Lernen angewendet: Gewicht {old_weight:.4f} -> {conn.weight:.4f}")
```
### Verbesserung 3: Simulation der Frageverarbeitung im Netzwerk
```python
def simulate_question_answering(category_nodes, question, questions):
category = next((q['category'] for q in questions if q['question'] == question), None)
if not category:
logging.warning(f"Frage '{question}' nicht gefunden!")
return None
category_node = next((node for node in category_nodes if node.label == category), None)
if category_node:
propagate_signal(category_node, input_signal=0.9, emotion_weights={}, emotional_state=1.0)
activation = category_node.activation
if activation is None or activation <= 0:
logging.warning(f"Kategorie '{category}' hat eine ungültige Aktivierung: {activation}")
return 0.0 # Rückgabe von 0, falls die Aktivierung fehlschlägt
logging.info(f"Verarbeite Frage: '{question}' → Kategorie: '{category}' mit Aktivierung {activation:.4f}")
return activation # Entfernte doppelte Logging-Ausgabe
else:
logging.warning(f"Kategorie '{category}' nicht im Netzwerk gefunden. Die Kategorie wird neu hinzugefügt!")
return 0.0
```
### Verbesserung 4: Finden der besten passenden Frage zur Benutzeranfrage
```python
def find_question_by_keyword(questions, keyword):
matching_questions = [q for q in questions if keyword.lower() in q['question'].lower()]
return matching_questions if matching_questions else None
```
### Verbesserung 5: Suche nach der ähnlichsten Frage basierend auf einfachen Ähnlichkeitsmetriken
```python
def find_similar_question(questions, query):
from difflib import get_close_matches
question_texts = [q['question'] for q in questions]
closest_matches = get_close_matches(query, question_texts, n=1, cutoff=0.6)
if closest_matches:
matched_question = next((q for q in questions if q['question'] == closest_matches[0]), None)
return matched_question
else:
return {"question": "Keine passende Frage gefunden", "category": "Unbekannt"}
```
### Verbesserung 6: Testfunktion zur Überprüfung des Modells
```python
def test_model(category_nodes, questions, query):
matched_question = find_question_by_keyword(questions, query)
if matched_question:
logging.info(f"Gefundene Frage: {matched_question[0]['question']} -> Kategorie: {matched_question[0]['category']}")
simulate_question_answering(category_nodes, matched_question[0]['question'], questions)
else:
logging.warning("Keine passende Frage gefunden.")
similarity_question = find_similar_question(questions, query)
logging.info(f"Ähnlichste Frage: {similarity_question['question']} -> Kategorie: {similarity_question['category']}")
```
### NetworkX-Funktionen für kausale Graphen
```python
def build_causal_graph(category_nodes):
G = nx.DiGraph()
for node in category_nodes:
G.add_node(node.label)
for conn in node.connections:
G.add_edge(node.label, conn.target_node.label, weight=conn.weight)
return G
def analyze_causality_multiple(G, num_pairs=3):
if len(G.nodes) < 2:
logging.warning("Graph enthält nicht genügend Knoten für eine Analyse.")
return
for _ in range(num_pairs):
start_node, target_node = random.sample(G.nodes, 2)
logging.info(f"Analysiere kausale Pfade von '{start_node}' nach '{target_node}'")
try:
paths = list(nx.all_simple_paths(G, source=start_node, target=target_node))
if paths:
for path in paths:
logging.info(f"Kausaler Pfad: {' -> '.join(path)}")
else:
logging.info(f"Kein Pfad gefunden von '{start_node}' nach '{target_node}'")
except nx.NetworkXNoPath:
logging.warning(f"Kein direkter Pfad zwischen '{start_node}' und '{target_node}' gefunden.")
def analyze_node_influence(G):
influence_scores = nx.pagerank(G, alpha=0.85)
sorted_influences = sorted(influence_scores.items(), key=lambda x: x[1], reverse=True)
for node, score in sorted_influences:
logging.info(f"Knoten: {node}, Einfluss: {score:.4f}")
```
### Funktion für Interventionen basierend auf Pearl's Do-Operator
```python
def do_intervention(node, new_value):
logging.info(f"Intervention: Setze {node.label} auf {new_value}")
node.activation = new_value
for conn in node.connections:
conn.target_node.activation += node.activation * conn.weight
```
### Kontextabhängiges Lernen verstärken
```python
def contextual_causal_analysis(node, context_factors, learning_rate=0.1):
context_factor = context_factors.get(node.label, 1.0)
if node.activation > 0.8 and context_factor > 1.0:
logging.info(f"Kausale Beziehung verstärkt für {node.label} aufgrund des Kontextes.")
for conn in node.connections:
conn.weight += learning_rate * context_factor
conn.weight = np.clip(conn.weight, 0, 1.0)
logging.info(f"Gewicht aktualisiert: {node.label} → {conn.target_node.label}, Gewicht: {conn.weight:.4f}")
```
### PyTorch-Modell für kausale Inferenz
```python
class CausalInferenceNN(nn.Module):
def __init__(self):
super(CausalInferenceNN, self).__init__()
self.fc1 = nn.Linear(10, 20)
self.fc2 = nn.Linear(20, 1)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)
```
### Debugging-Funktion
```python
def debug_connections(category_nodes):
start_time = time.time()
for node in category_nodes:
logging.info(f"Knoten: {node.label}")
for conn in node.connections:
logging.info(f" Verbindung zu: {conn.target_node.label}, Gewicht: {conn.weight}")
end_time = time.time()
logging.info(f"debug_connections Ausführungszeit: {end_time - start_time:.4f} Sekunden")
```
### Hilfsfunktionen
```python
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def add_activation_noise(activation, noise_level=0.1):
noise = np.random.normal(0, noise_level)
return np.clip(activation + noise, 0.0, 1.0)
def decay_weights(category_nodes, decay_rate=0.002, forgetting_curve=0.95):
for node in category_nodes:
for conn in node.connections:
conn.weight *= (1 - decay_rate) * forgetting_curve
def reward_connections(category_nodes, target_category, reward_factor=0.1):
for node in category_nodes:
if node.label == target_category:
for conn in node.connections:
conn.weight += reward_factor
conn.weight = np.clip(conn.weight, 0, 1.0)
def apply_emotion_weight(activation, category_label, emotion_weights, emotional_state=1.0):
emotion_factor = emotion_weights.get(category_label, 1.0) * emotional_state
return activation * emotion_factor
def generate_simulated_answers(data, personality_distributions):
simulated_answers = []
for _, row in data.iterrows():
category = row['Kategorie']
mean = personality_distributions.get(category, 0.5)
simulated_answer = np.clip(np.random.normal(mean, 0.2), 0.0, 1.0)
simulated_answers.append(simulated_answer)
return simulated_answers
def social_influence(category_nodes, social_network, influence_factor=0.1):
for node in category_nodes:
for conn in node.connections:
social_impact = sum([social_network.get(conn.target_node.label, 0)]) * influence_factor
conn.weight += social_impact
conn.weight = np.clip(conn.weight, 0, 1.0)
def update_emotional_state(emotional_state, emotional_change_rate=0.02):
emotional_state += np.random.normal(0, emotional_change_rate)
return np.clip(emotional_state, 0.7, 1.5)
def apply_contextual_factors(activation, node, context_factors):
context_factor = context_factors.get(node.label, 1.0)
return activation * context_factor * random.uniform(0.9, 1.1)
def long_term_memory(category_nodes, long_term_factor=0.01):
for node in category_nodes:
for conn in node.connections:
conn.weight += long_term_factor * conn.weight
conn.weight = np.clip(conn.weight, 0, 1.0)
def hebbian_learning(node, learning_rate=0.3, weight_limit=1.0, reg_factor=0.005):
for connection in node.connections:
old_weight = connection.weight
connection.weight += learning_rate * node.activation * connection.target_node.activation
connection.weight = np.clip(connection.weight, -weight_limit, weight_limit)
connection.weight -= reg_factor * connection.weight
node.activation_history.append(node.activation) # Aktivierung speichern
connection.target_node.activation_history.append(connection.target_node.activation)
logging.info(f"Hebb'sches Lernen: Gewicht von {old_weight:.4f} auf {connection.weight:.4f} erhöht")
```
### Klassen für Netzwerkstruktur
```python
class Connection:
def __init__(self, target_node, weight=None):
self.target_node = target_node
self.weight = weight if weight is not None else random.uniform(0.1, 1.0)
class Node:
def __init__(self, label):
self.label = label
self.connections = []
self.activation = 0.0
self.activation_history = []
def add_connection(self, target_node, weight=None):
self.connections.append(Connection(target_node, weight))
def save_state(self):
return {
"label": self.label,
"activation": self.activation,
"activation_history": self.activation_history,
"connections": [{"target": conn.target_node.label, "weight": conn.weight} for conn in self.connections]
}
@staticmethod
def load_state(state, nodes_dict):
node = Node(state["label"])
node.activation = state["activation"]
node.activation_history = state["activation_history"]
for conn_state in state["connections"]:
target_node = nodes_dict[conn_state["target"]]
connection = Connection(target_node, conn_state["weight"])
node.connections.append(connection)
return node
class MemoryNode(Node):
def __init__(self, label, memory_type="short_term"):
super().__init__(label)
self.memory_type = memory_type
self.retention_time = {"short_term": 5, "mid_term": 20, "long_term": 100}[memory_type]
self.time_in_memory = 0
def decay(self, decay_rate, context_factors, emotional_state):
context_factor = context_factors.get(self.label, 1.0)
emotional_factor = emotional_state
for conn in self.connections:
if self.memory_type == "short_term":
conn.weight *= (1 - decay_rate * 2 * context_factor * emotional_factor)
elif self.memory_type == "mid_term":
conn.weight *= (1 - decay_rate * context_factor * emotional_factor)
elif self.memory_type == "long_term":
conn.weight *= (1 - decay_rate * 0.5 * context_factor * emotional_factor)
def promote(self, activation_threshold=0.7):
if len(self.activation_history) == 0:
return
if self.memory_type == "short_term" and np.mean(self.activation_history[-5:]) > activation_threshold:
self.memory_type = "mid_term"
self.retention_time = 20
elif self.memory_type == "mid_term" and np.mean(self.activation_history[-20:]) > activation_threshold:
self.memory_type = "long_term"
self.retention_time = 100
class CortexCreativus(Node):
def __init__(self, label):
super().__init__(label)
def generate_new_ideas(self, category_nodes):
new_ideas = []
for node in category_nodes:
if node.activation > 0.5:
new_idea = f"New idea based on {node.label} with activation {node.activation}"
new_ideas.append(new_idea)
return new_ideas
class SimulatrixNeuralis(Node):
def __init__(self, label):
super().__init__(label)
def simulate_scenarios(self, category_nodes):
scenarios = []
for node in category_nodes:
if node.activation > 0.5:
scenario = f"Simulated scenario based on {node.label} with activation {node.activation}"
scenarios.append(scenario)
return scenarios
class CortexCriticus(Node):
def __init__(self, label):
super().__init__(label)
def evaluate_ideas(self, ideas):
evaluated_ideas = []
for idea in ideas:
evaluation_score = random.uniform(0, 1)
evaluation = f"Evaluated idea: {idea} - Score: {evaluation_score}"
evaluated_ideas.append(evaluation)
return evaluated_ideas
class LimbusAffectus(Node):
def __init__(self, label):
super().__init__(label)
def apply_emotion_weight(self, ideas, emotional_state):
weighted_ideas = []
for idea in ideas:
weighted_idea = f"Emotionally weighted idea: {idea} - Weight: {emotional_state}"
weighted_ideas.append(weighted_idea)
return weighted_ideas
class MetaCognitio(Node):
def __init__(self, label):
super().__init__(label)
def optimize_system(self, category_nodes):
for node in category_nodes:
node.activation *= random.uniform(0.9, 1.1)
class CortexSocialis(Node):
def __init__(self, label):
super().__init__(label)
def simulate_social_interactions(self, category_nodes):
interactions = []
for node in category_nodes:
if node.activation > 0.5:
interaction = f"Simulated social interaction based on {node.label} with activation {node.activation}"
interactions.append(interaction)
return interactions
def connect_new_brains_to_network(category_nodes, new_brains):
for brain in new_brains:
for node in category_nodes:
brain.add_connection(node)
node.add_connection(brain)
```
### Netzwerk-Initialisierung
```python
def initialize_quiz_network(categories):
try:
category_nodes = [Node(c) for c in categories]
for node in category_nodes:
for target_node in category_nodes:
if node != target_node:
node.add_connection(target_node)
logging.debug(f"Verbindung hinzugefügt: {node.label} → {target_node.label}")
debug_connections(category_nodes)
for node in category_nodes:
logging.info(f"Knoten erstellt: {node.label}")
for conn in node.connections:
logging.info(f" → Verbindung zu {conn.target_node.label} mit Gewicht {conn.weight:.4f}")
return category_nodes
except Exception as e:
logging.error(f"Fehler bei der Netzwerk-Initialisierung: {e}")
return []
```
### Signalpropagation
```python
def propagate_signal(node, input_signal, emotion_weights, emotional_state=1.0, context_factors=None):
node.activation = add_activation_noise(sigmoid(input_signal * random.uniform(0.8, 1.2)))
node.activation_history.append(node.activation) # Aktivierung speichern
node.activation = apply_emotion_weight(node.activation, node.label, emotion_weights, emotional_state)
if context_factors:
node.activation = apply_contextual_factors(node.activation, node, context_factors)
logging.info(f"Signalpropagation für {node.label}: Eingangssignal {input_signal:.4f}")
for connection in node.connections:
logging.info(f" → Signal an {connection.target_node.label} mit Gewicht {connection.weight:.4f}")
connection.target_node.activation += node.activation * connection.weight
def propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state):
node.activation = add_activation_noise(sigmoid(input_signal))
node.activation_history.append(node.activation)
for connection in node.connections:
connection.target_node.activation += node.activation * connection.weight
for memory_node in memory_nodes:
memory_node.time_in_memory += 1
memory_node.promote()
```
### Simulation mit Anpassungen
```python
def simulate_learning(data, category_nodes, personality_distributions, epochs=1, learning_rate=0.8, reward_interval=5, decay_rate=0.002, emotional_state=1.0, context_factors=None):
if context_factors is None:
context_factors = {}
weights_history = {f"{node.label} → {conn.target_node.label}": [] for node in category_nodes for conn in node.connections}
activation_history = {node.label: [] for node in category_nodes}
question_nodes = []
for idx, row in data.iterrows():
q_node = Node(row['Frage'])
question_nodes.append(q_node)
category_label = row['Kategorie'].strip()
category_node = next((c for c in category_nodes if c.label == category_label), None)
if category_node:
q_node.add_connection(category_node)
logging.debug(f"Verbindung hinzugefügt: {q_node.label} → {category_node.label}")
else:
logging.warning(f"Warnung: Kategorie '{category_label}' nicht gefunden für Frage '{row['Frage']}'.")
emotion_weights = {category: 1.0 for category in data['Kategorie'].unique()}
social_network = {category: random.uniform(0.1, 1.0) for category in data['Kategorie'].unique()}
for epoch in range(epochs):
logging.info(f"\n--- Epoche {epoch + 1} ---")
simulated_answers = generate_simulated_answers(data, personality_distributions)
for node in category_nodes:
node.activation_sum = 0.0
node.activation_count = 0
for node in category_nodes:
propagate_signal(node, random.uniform(0.1, 0.9), emotion_weights, emotional_state, context_factors)
node.activation_history.append(node.activation) # Aktivierung speichern
for idx, q_node in enumerate(question_nodes):
for node in category_nodes + question_nodes:
node.activation = 0.0
answer = simulated_answers[idx]
propagate_signal(q_node, answer, emotion_weights, emotional_state, context_factors)
q_node.activation_history.append(q_node.activation) # Aktivierung speichern
hebbian_learning(q_node, learning_rate)
for node in category_nodes:
node.activation_sum += node.activation
if node.activation > 0:
node.activation_count += 1
for node in category_nodes:
for conn in node.connections:
weights_history[f"{node.label} → {conn.target_node.label}"].append(conn.weight)
logging.debug(f"Gewicht aktualisiert: {node.label} → {conn.target_node.label}, Gewicht: {conn.weight}")
# Kausalitätsverstärkung anwenden
contextual_causal_analysis(q_node, context_factors, learning_rate)
for node in category_nodes:
if node.activation_count > 0:
mean_activation = node.activation_sum / node.activation_count
activation_history[node.label].append(mean_activation)
logging.info(f"Durchschnittliche Aktivierung für Knoten {node.label}: {mean_activation:.4f}")
else:
activation_history[node.label].append(0.0)
logging.info(f"Knoten {node.label} wurde in dieser Epoche nicht aktiviert.")
if (epoch + 1) % reward_interval == 0:
target_category = random.choice(data['Kategorie'].unique())
reward_connections(category_nodes, target_category=target_category)
decay_weights(category_nodes, decay_rate=decay_rate)
social_influence(category_nodes, social_network)
logging.info("Simulation abgeschlossen. Ergebnisse werden analysiert...")
return activation_history, weights_history
```
### Simulation mit mehrstufigem Gedächtnis
```python
def simulate_multilevel_memory(data, category_nodes, personality_distributions, epochs=1):
short_term_memory = [MemoryNode(c, "short_term") for c in category_nodes]
mid_term_memory = []
long_term_memory = []
memory_nodes = short_term_memory + mid_term_memory + long_term_memory
context_factors = {question: random.uniform(0.9, 1.1) for question in data['Frage'].unique()}
emotional_state = 1.0
for epoch in range(epochs):
logging.info(f"\n--- Epoche {epoch + 1} ---")
for node in short_term_memory:
input_signal = random.uniform(0.1, 1.0)
propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state)
for memory_node in memory_nodes:
memory_node.decay(decay_rate=0.01, context_factors=context_factors, emotional_state=emotional_state)
for memory_node in memory_nodes:
memory_node.promote()
short_term_memory, mid_term_memory, long_term_memory = update_memory_stages(memory_nodes)
logging.info(f"Epoche {epoch + 1}: Kurzzeit {len(short_term_memory)}, Mittelzeit {len(mid_term_memory)}, Langzeit {len(long_term_memory)}")
return short_term_memory, mid_term_memory, long_term_memory
def update_memory_stages(memory_nodes):
short_term_memory = [node for node in memory_nodes if node.memory_type == "short_term"]
mid_term_memory = [node for node in memory_nodes if node.memory_type == "mid_term"]
long_term_memory = [node for node in memory_nodes if node.memory_type == "long_term"]
return short_term_memory, mid_term_memory, long_term_memory
```
### Plot-Funktionen
```python
def plot_activation_history(activation_history, filename="activation_history.png"):
if not activation_history:
logging.warning("No activation history to plot")
return
plt.figure(figsize=(12, 8))
for label, activations in activation_history.items():
if len(activations) > 0:
plt.plot(range(1, len(activations) + 1), activations, label=label)
plt.title("Entwicklung der Aktivierungen während des Lernens")
plt.xlabel("Epoche")
plt.ylabel("Aktivierung")
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
plt.close()
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")
def plot_dynamics(activation_history, weights_history, filename="dynamics.png"):
if not weights_history:
logging.error("weights_history ist leer.")
return
plt.figure(figsize=(16, 12))
plt.subplot(2, 2, 1)
for label, activations in activation_history.items():
if len(activations) > 0:
plt.plot(range(1, len(activations) + 1), activations, label=label)
plt.title("Entwicklung der Aktivierungen während des Lernens")
plt.xlabel("Epoche")
plt.ylabel("Aktivierung")
plt.legend()
plt.grid(True)
plt.subplot(2, 2, 2)
for label, weights in weights_history.items():
if len(weights) > 0:
plt.plot(range(1, len(weights) + 1), weights, label=label, alpha=0.7)
plt.title("Entwicklung der Verbindungsgewichte während des Lernens")
plt.xlabel("Epoche")
plt.ylabel("Gewicht")
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
plt.close()
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")
def plot_memory_distribution(short_term_memory, mid_term_memory, long_term_memory, filename="memory_distribution.png"):
counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)]
labels = ["Kurzfristig", "Mittelfristig", "Langfristig"]
plt.figure(figsize=(8, 6))
plt.bar(labels, counts, color=["red", "blue", "green"])
plt.title("Verteilung der Gedächtnisknoten")
plt.ylabel("Anzahl der Knoten")
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
plt.close()
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")
def plot_activation_heatmap(activation_history, filename="activation_heatmap.png"):
if not activation_history:
logging.warning("No activation history to plot")
return
min_length = min(len(activations) for activations in activation_history.values())
truncated_activations = {key: values[:min_length] for key, values in activation_history.items()}
plt.figure(figsize=(12, 8))
heatmap_data = np.array([activations for activations in truncated_activations.values()])
if heatmap_data.size == 0:
logging.error("Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.")
return
sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False)
plt.title("Heatmap der Aktivierungswerte")
plt.xlabel("Kategorie")
plt.ylabel("Epoche")
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
plt.close()
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")
def plot_network_topology(category_nodes, new_brains, filename="network_topology.png"):
G = nx.DiGraph()
for node in category_nodes:
G.add_node(node.label)
for conn in node.connections:
G.add_edge(node.label, conn.target_node.label, weight=conn.weight)
for brain in new_brains:
G.add_node(brain.label, color='red')
for conn in brain.connections:
G.add_edge(brain.label, conn.target_node.label, weight=conn.weight)
pos = nx.spring_layout(G)
edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)}
node_colors = [G.nodes[node].get('color', 'skyblue') for node in G.nodes()]
nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray")
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
plt.title("Netzwerktopologie")
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight")
plt.close()
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}")
```
### Modell speichern und laden
```python
def save_model(category_nodes, filename="model.json"):
model_data = {
"nodes": [node.save_state() for node in category_nodes]
}
with open(filename, "w") as file:
json.dump(model_data, file, indent=4)
logging.info(f"Modell gespeichert in {filename}")
def save_model_with_questions_and_answers(category_nodes, questions, filename="model_with_qa.json"):
global model_saved
logging.info("Starte Speichern des Modells...")
# Überprüfen, ob Änderungen vorgenommen wurden
current_model_data = {
"nodes": [node.save_state() for node in category_nodes],
"questions": questions
}
if os.path.exists(filename):
try:
with open(filename, "r", encoding="utf-8") as file:
existing_model_data = json.load(file)
if existing_model_data == current_model_data:
logging.info("Keine Änderungen erkannt, erneutes Speichern übersprungen.")
return
except Exception as e:
logging.warning(f"Fehler beim Überprüfen des vorhandenen Modells: {e}")
# Speichern des aktualisierten Modells
try:
with open(filename, "w", encoding="utf-8") as file:
json.dump(current_model_data, file, indent=4)
logging.info(f"Modell erfolgreich gespeichert unter {filename}.")
model_saved = True # Setze auf True nach erfolgreichem Speichern
except Exception as e:
logging.error(f"Fehler beim Speichern des Modells: {e}")
def load_model_with_questions_and_answers(filename="model_with_qa.json"):
global initialized
if initialized:
logging.info("Modell bereits initialisiert.")
return None, None
if not os.path.exists(filename):
logging.warning(f"Datei {filename} nicht gefunden. Netzwerk wird initialisiert.")
return None, None
try:
with open(filename, "r", encoding="utf-8") as file:
model_data = json.load(file)
nodes_dict = {node_data["label"]: Node(node_data["label"]) for node_data in model_data["nodes"]}
for node_data in model_data["nodes"]:
node = nodes_dict[node_data["label"]]
node.activation = node_data.get("activation", 0.0)
for conn_state in node_data["connections"]:
target_node = nodes_dict.get(conn_state["target"])
if target_node:
node.add_connection(target_node, conn_state["weight"])
questions = model_data.get("questions", [])
logging.info(f"Modell geladen mit {len(nodes_dict)} Knoten und {len(questions)} Fragen")
initialized = True
return list(nodes_dict.values()), questions
except json.JSONDecodeError as e:
logging.error(f"Fehler beim Parsen der JSON-Datei: {e}")
return None, None
```
### Fragen aktualisieren
```python
def update_questions_with_answers(filename="model_with_qa.json"):
with open(filename, "r") as file:
model_data = json.load(file)
for question in model_data["questions"]:
if "answer" not in question:
question["answer"] = input(f"Gib die Antwort für: '{question['question']}': ")
with open(filename, "w") as file:
json.dump(model_data, file, indent=4)
logging.info(f"Fragen wurden mit Antworten aktualisiert und gespeichert in {filename}")
```
### Beste Antwort finden
```python
def find_best_answer(category_nodes, questions, query):
matched_question = find_similar_question(questions, query)
if matched_question:
logging.info(f"Gefundene Frage: {matched_question['question']} -> Kategorie: {matched_question['category']}")
answer = matched_question.get("answer", "Keine Antwort verfügbar")
logging.info(f"Antwort: {answer}")
return answer
else:
logging.warning("Keine passende Frage gefunden.")
return None
```
### Dashboard erstellen
```python
def create_dashboard(category_nodes, activation_history, short_term_memory, mid_term_memory, long_term_memory):
root = tk.Tk()
root.title("Psyco Dashboard")
# Anzeige der Aktivierungshistorie
activation_frame = ttk.Frame(root, padding="10")
activation_frame.pack(fill=tk.BOTH, expand=True)
activation_label = ttk.Label(activation_frame, text="Aktivierungshistorie")
activation_label.pack()
if activation_history:
for label, activations in activation_history.items():
fig, ax = plt.subplots()
ax.plot(range(1, len(activations) + 1), activations)
ax.set_title(label)
canvas = FigureCanvasTkAgg(fig, master=activation_frame)
canvas.draw()
canvas.get_tk_widget().pack()
else:
no_data_label = ttk.Label(activation_frame, text="Keine Aktivierungshistorie verfügbar.")
no_data_label.pack()
# Anzeige der Gedächtnisverteilung
memory_frame = ttk.Frame(root, padding="10")
memory_frame.pack(fill=tk.BOTH, expand=True)
memory_label = ttk.Label(memory_frame, text="Gedächtnisverteilung")
memory_label.pack()
memory_counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)]
labels = ["Kurzfristig", "Mittelfristig", "Langfristig"]
fig, ax = plt.subplots()
ax.bar(labels, memory_counts, color=["red", "blue", "green"])
ax.set_title("Verteilung der Gedächtnisknoten")
ax.set_ylabel("Anzahl der Knoten")
canvas = FigureCanvasTkAgg(fig, master=memory_frame)
canvas.draw()
canvas.get_tk_widget().pack()
# Anzeige der Netzwerktopologie
topology_frame = ttk.Frame(root, padding="10")
topology_frame.pack(fill=tk.BOTH, expand=True)
topology_label = ttk.Label(topology_frame, text="Netzwerktopologie")
topology_label.pack()
G = nx.DiGraph()
for node in category_nodes:
G.add_node(node.label)
for conn in node.connections:
G.add_edge(node.label, conn.target_node.label, weight=conn.weight)
pos = nx.spring_layout(G)
edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)}
node_colors = ['skyblue' for _ in G.nodes()]
fig, ax = plt.subplots()
nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray", ax=ax)
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, ax=ax)
ax.set_title("Netzwerktopologie")
canvas = FigureCanvasTkAgg(fig, master=topology_frame)
canvas.draw()
canvas.get_tk_widget().pack()
# Anzeige der Heatmap der Aktivierungswerte
heatmap_frame = ttk.Frame(root, padding="10")
heatmap_frame.pack(fill=tk.BOTH, expand=True)
heatmap_label = ttk.Label(heatmap_frame, text="Heatmap der Aktivierungswerte")
heatmap_label.pack()
if activation_history:
min_length = min(len(activations) for activations in activation_history.values())
truncated_activations = {key: values[:min_length] for key, values in activation_history.items()}
heatmap_data = np.array([activations for activations in truncated_activations.values()])
if heatmap_data.size > 0:
fig, ax = plt.subplots()
sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False, ax=ax)
ax.set_title("Heatmap der Aktivierungswerte")
ax.set_xlabel("Kategorie")
ax.set_ylabel("Epoche")
canvas = FigureCanvasTkAgg(fig, master=heatmap_frame)
canvas.draw()
canvas.get_tk_widget().pack()
else:
no_data_label = ttk.Label(heatmap_frame, text="Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.")
no_data_label.pack()
else:
no_data_label = ttk.Label(heatmap_frame, text="Keine Aktivierungshistorie verfügbar.")
no_data_label.pack()
root.mainloop()
```
### CSV-Datei verarbeiten
```python
def process_csv_in_chunks(filename, chunk_size=10000):
global category_nodes, questions
logging.info(f"Beginne Verarbeitung der Datei: {filename}")
try:
# Test, ob die Datei existiert
if not os.path.exists(filename):
logging.error(f"Datei {filename} nicht gefunden.")
return None
all_chunks = []
for chunk in pd.read_csv(filename, chunksize=chunk_size, encoding="utf-8", on_bad_lines='skip'):
logging.info(f"Chunk mit {len(chunk)} Zeilen gelesen.")
if 'Frage' not in chunk.columns or 'Kategorie' not in chunk.columns or 'Antwort' not in chunk.columns:
logging.error("CSV-Datei enthält nicht die erwarteten Spalten: 'Frage', 'Kategorie', 'Antwort'")
return None
all_chunks.append(chunk)
data = pd.concat(all_chunks, ignore_index=True)
logging.info(f"Alle Chunks erfolgreich verarbeitet. Gesamtzeilen: {len(data)}")
return data
except pd.errors.EmptyDataError:
logging.error("CSV-Datei ist leer.")
except pd.errors.ParserError as e:
logging.error(f"Parsing-Fehler in CSV-Datei: {e}")
except Exception as e:
logging.error(f"Unerwarteter Fehler beim Verarbeiten der Datei: {e}")
return None
```
### Einzelne Einträge verarbeiten
```python
def process_single_entry(question, category, answer):
global category_nodes, questions
# Sicherstellen, dass die globalen Variablen initialisiert sind
if category_nodes is None:
category_nodes = []
logging.warning("Kategorie-Knotenliste war None, wurde nun initialisiert.")
if questions is None:
questions = []
logging.warning("Fragenliste war None, wurde nun initialisiert.")
# Überprüfen, ob die Kategorie bereits vorhanden ist
if not any(node.label == category for node in category_nodes):
category_nodes.append(Node(category))
logging.info(f"Neue Kategorie '{category}' dem Netzwerk hinzugefügt.")
# Frage, Kategorie und Antwort zur Liste hinzufügen
questions.append({"question": question, "category": category, "answer": answer})
logging.info(f"Neue Frage hinzugefügt: '{question}' -> Kategorie: '{category}'")
```
### CSV-Datei mit Dask verarbeiten
```python
def process_csv_with_dask(filename, chunk_size=10000):
try:
ddf = dd.read_csv(filename, blocksize=chunk_size)
ddf = ddf.astype({'Kategorie': 'category'})
for row in ddf.itertuples(index=False, name=None):
process_single_entry(row[0], row[1], row[2])
logging.info("Alle Chunks erfolgreich mit Dask verarbeitet.")
except Exception as e:
logging.error(f"Fehler beim Verarbeiten der Datei mit Dask: {e}")
```
### In SQLite speichern
```python
def save_to_sqlite(filename, db_name="dataset.db"):
conn = sqlite3.connect(db_name)
chunk_iter = pd.read_csv(filename, chunksize=10000)
for chunk in chunk_iter:
chunk.to_sql("qa_data", conn, if_exists="append", index=False)
logging.info(f"Chunk mit {len(chunk)} Zeilen gespeichert.")
conn.close()
logging.info("CSV-Daten wurden erfolgreich in SQLite gespeichert.")
```
### Aus SQLite laden
```python
def load_from_sqlite(db_name="dataset.db"):
conn = sqlite3.connect(db_name)
query = "SELECT Frage, Kategorie, Antwort FROM qa_data"
data = pd.read_sql_query(query, conn)
conn.close()
return data
```
### Teilmodell speichern
```python
def save_partial_model(filename="partial_model.json"):
model_data = {
"nodes": [node.save_state() for node in category_nodes],
"questions": questions
}
with open(filename, "w") as file:
json.dump(model_data, file, indent=4)
logging.info("Teilmodell gespeichert.")
```
### CSV-Datei faul laden
```python
def lazy_load_csv(filename, chunk_size=10000):
for chunk in pd.read_csv(filename, chunksize=chunk_size):
for _, row in chunk.iterrows():
yield row['Frage'], row['Kategorie'], row['Antwort']
```
### Hauptfunktion
```python
def main():
start_time = time.time()
category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json")
if category_nodes is None:
csv_file = "data.csv"
data = process_csv_in_chunks(csv_file)
if data is None:
logging.error("Fehler beim Laden der CSV-Datei.")
return
if len(data) > 1000:
logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...")
split_csv(csv_file)
# Verarbeite jede aufgeteilte Datei
data_dir = "data"
for filename in os.listdir(data_dir):
if filename.endswith(".csv"):
file_path = os.path.join(data_dir, filename)
logging.info(f"Verarbeite Datei: {file_path}")
data = process_csv_in_chunks(file_path)
if data is None:
logging.error("Fehler beim Laden der CSV-Datei.")
return
categories = data['Kategorie'].unique()
category_nodes = initialize_quiz_network(categories)
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions)
save_model_with_questions_and_answers(category_nodes, questions)
else:
logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.")
categories = data['Kategorie'].unique()
category_nodes = initialize_quiz_network(categories)
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions)
save_model_with_questions_and_answers(category_nodes, questions)
end_time = time.time()
logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden")
```
### Simulation aus der GUI starten
```python
def run_simulation_from_gui(learning_rate, decay_rate, reward_interval, epochs):
global model_saved
model_saved = False # Erzwinge das Speichern nach dem Training
start_time = time.time()
csv_file = "data.csv"
category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json")
if category_nodes is None:
data = process_csv_in_chunks(csv_file)
if not isinstance(data, pd.DataFrame):
logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.")
return
if len(data) > 1000:
logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...")
split_csv(csv_file)
# Verarbeite jede aufgeteilte Datei
data_dir = "data"
for filename in os.listdir(data_dir):
if filename.endswith(".csv"):
file_path = os.path.join(data_dir, filename)
logging.info(f"Verarbeite Datei: {file_path}")
data = process_csv_in_chunks(file_path)
if not isinstance(data, pd.DataFrame):
logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.")
return
categories = data['Kategorie'].unique()
category_nodes = initialize_quiz_network(categories)
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
activation_history, weights_history = simulate_learning(
data, category_nodes, personality_distributions,
epochs=int(epochs),
learning_rate=float(learning_rate),
reward_interval=int(reward_interval),
decay_rate=float(decay_rate)
)
save_model_with_questions_and_answers(category_nodes, questions)
else:
logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.")
categories = data['Kategorie'].unique()
category_nodes = initialize_quiz_network(categories)
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
activation_history, weights_history = simulate_learning(
data, category_nodes, personality_distributions,
epochs=int(epochs),
learning_rate=float(learning_rate),
reward_interval=int(reward_interval),
decay_rate=float(decay_rate)
)
save_model_with_questions_and_answers(category_nodes, questions)
else:
data = process_csv_in_chunks(csv_file)
if not isinstance(data, pd.DataFrame):
logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.")
return
logging.info(f"Anzahl der Zeilen in der geladenen CSV: {len(data)}")
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]}
activation_history, weights_history = simulate_learning(
data, category_nodes, personality_distributions,
epochs=int(epochs),
learning_rate=float(learning_rate),
reward_interval=int(reward_interval),
decay_rate=float(decay_rate)
)
save_model_with_questions_and_answers(category_nodes, questions)
end_time = time.time()
logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden")
messagebox.showinfo("Ergebnis", f"Simulation abgeschlossen! Dauer: {end_time - start_time:.2f} Sekunden")
```
### Netzwerk asynchron initialisieren
```python
def async_initialize_network():
global category_nodes, questions, model_saved
logging.info("Starte Initialisierung des Netzwerks...")
category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json")
if category_nodes is None:
category_nodes = []
logging.warning("Keine gespeicherten Kategorien gefunden. Neues Netzwerk wird erstellt.")
model_saved = False # Zurücksetzen der Speicher-Flagge
if questions is None:
questions = []
logging.warning("Keine gespeicherten Fragen gefunden. Neues Fragen-Array wird erstellt.")
model_saved = False # Zurücksetzen der Speicher-Flagge
if not category_nodes:
csv_file = "data.csv"
data = process_csv_in_chunks(csv_file)
if isinstance(data, pd.DataFrame):
if len(data) > 1000:
logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...")
split_csv(csv_file)
# Verarbeite jede aufgeteilte Datei
data_dir = "data"
for filename in os.listdir(data_dir):
if filename.endswith(".csv"):
file_path = os.path.join(data_dir, filename)
logging.info(f"Verarbeite Datei: {file_path}")
data = process_csv_in_chunks(file_path)
if isinstance(data, pd.DataFrame):
categories = data['Kategorie'].unique()
category_nodes = initialize_quiz_network(categories)
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.")
model_saved = False # Zurücksetzen der Speicher-Flagge
else:
logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.")
categories = data['Kategorie'].unique()
category_nodes = initialize_quiz_network(categories)
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()]
logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.")
model_saved = False # Zurücksetzen der Speicher-Flagge
else:
logging.error("Fehler beim Laden der CSV-Daten. Netzwerk konnte nicht initialisiert werden.")
return
save_model_with_questions_and_answers(category_nodes, questions)
logging.info("Netzwerk erfolgreich initialisiert.")
```
### GUI starten
```python
def start_gui():
def start_simulation():
try:
threading.Thread(target=run_simulation_from_gui, args=(0.8, 0.002, 5, 10), daemon=True).start()
messagebox.showinfo("Info", "Simulation gestartet!")
logging.info("Simulation gestartet")
except Exception as e:
logging.error(f"Fehler beim Start der Simulation: {e}")
messagebox.showerror("Fehler", f"Fehler: {e}")
root = tk.Tk()
root.title("DRLCogNet GUI")
root.geometry("400x300")
header_label = tk.Label(root, text="Simulationseinstellungen", font=("Helvetica", 16))
header_label.pack(pady=10)
start_button = tk.Button(root, text="Simulation starten", command=start_simulation)
start_button.pack(pady=20)
root.mainloop()
```
### Hauptprogramm
```python
if __name__ == "__main__":
# Starte die Initialisierung in einem Thread
threading.Thread(target=async_initialize_network, daemon=True).start()
start_gui()
```
## Fragen zur Datenbank (SQLite)
### Wird die Datenbank im Arbeitsspeicher erstellt?
Ja, die SQLite-Datenbank wird im Arbeitsspeicher erstellt, wenn die Funktion `save_to_sqlite` aufgerufen wird. Diese Funktion erstellt eine SQLite-Datenbankdatei (standardmäßig `dataset.db`), die im Arbeitsspeicher gespeichert wird, wenn Sie sie nicht an einem anderen Ort speichern.
### Wie wird die Datenbank erstellt?
Die Datenbank wird erstellt, indem eine Verbindung zur SQLite-Datenbank hergestellt wird. Wenn die Datei `dataset.db` nicht existiert, wird sie erstellt. Anschließend werden die Daten aus der CSV-Datei in Chunks gelesen und in die Tabelle `qa_data` der SQLite-Datenbank gespeichert.
### Wie werden die Daten in die Datenbank geladen?
Die Daten werden in Chunks aus der CSV-Datei gelesen und in die Tabelle `qa_data` der SQLite-Datenbank gespeichert. Die Funktion `to_sql` von Pandas wird verwendet, um die Daten in die Datenbank zu schreiben.
### Wie werden die Daten aus der Datenbank geladen?
Die Daten werden aus der Datenbank geladen, indem eine Verbindung zur SQLite-Datenbank hergestellt und eine SQL-Abfrage ausgeführt wird, um die Daten aus der Tabelle `qa_data` zu lesen. Die Funktion `read_sql_query` von Pandas wird verwendet, um die Daten in einen Pandas-DataFrame zu laden.
### Beispielcode zur Verwendung der Datenbank
```python
# Daten in die Datenbank speichern
save_to_sqlite("data.csv")
# Daten aus der Datenbank laden
data = load_from_sqlite()
```
## Fazit
Diese Dokumentation bietet eine umfassende Übersicht über den Code und die Verwendung der SQLite-Datenbank zur Speicherung und zum Laden von Daten. Der Code ist modular aufgebaut und ermöglicht die Verarbeitung und Simulation von Daten aus einer CSV-Datei in einem neuronalen Netzwerk. Die SQLite-Datenbank wird im Arbeitsspeicher erstellt und ermöglicht die effiziente Speicherung und das Laden von Daten.