|
# Code-Dokumentation |
|
|
|
## Einführung |
|
|
|
Dieses Dokument beschreibt den Code zur Verarbeitung und Simulation von Daten aus einer CSV-Datei, die in einem neuronalen Netzwerk verwendet werden. Der Code umfasst Funktionen zur Initialisierung des Netzwerks, zur Verarbeitung der CSV-Datei, zur Simulation des Lernprozesses und zur Speicherung und Laden des Modells. |
|
|
|
## Abhängigkeiten |
|
|
|
- `pandas`: Zur Verarbeitung von CSV-Dateien. |
|
- `numpy`: Für numerische Operationen. |
|
- `random`: Für zufällige Operationen. |
|
- `tqdm`: Für Fortschrittsanzeigen. |
|
- `tkinter`: Für die grafische Benutzeroberfläche. |
|
- `seaborn`: Für die Visualisierung. |
|
- `networkx`: Für die Erstellung und Analyse von Graphen. |
|
- `json`: Für die Speicherung und das Laden von Modellen. |
|
- `os`: Für Dateioperationen. |
|
- `time`: Für Zeitmessungen. |
|
- `torch`: Für neuronale Netzwerke. |
|
- `threading`: Für die Verwaltung von Threads. |
|
- `logging`: Für die Protokollierung. |
|
- `sqlite3`: Für die Verwendung von SQLite-Datenbanken. |
|
- `dask.dataframe`: Für die parallele Verarbeitung von Daten. |
|
|
|
## Konfiguration des Loggers |
|
|
|
```python |
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
``` |
|
|
|
## Globale Variablen |
|
|
|
- `initialized`: Überprüft, ob das Netzwerk initialisiert wurde. |
|
- `category_nodes`: Liste der Knoten im Netzwerk. |
|
- `questions`: Liste der Fragen. |
|
- `model_saved`: Schutzvariable, um zu überprüfen, ob das Modell gespeichert wurde. |
|
|
|
## Überprüfen, ob der Ordner existiert |
|
|
|
```python |
|
output_dir = "plots" |
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
``` |
|
|
|
## Funktionen |
|
|
|
### Funktion zum Aufteilen der CSV-Datei |
|
|
|
```python |
|
def split_csv(filename, chunk_size=1000, output_dir="data"): |
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
chunk_iter = pd.read_csv(filename, chunksize=chunk_size) |
|
for i, chunk in enumerate(chunk_iter): |
|
chunk.to_csv(os.path.join(output_dir, f"data_part_{i}.csv"), index=False) |
|
logging.info(f"Chunk {i} mit {len(chunk)} Zeilen gespeichert.") |
|
``` |
|
|
|
### Verbesserung 1: Verstärkung der Verbindungen bei häufig gestellten Fragen |
|
|
|
```python |
|
def strengthen_question_connection(category_nodes, question, category): |
|
category_node = next((node for node in category_nodes if node.label == category), None) |
|
if category_node: |
|
for conn in category_node.connections: |
|
if conn.target_node.label == question: |
|
old_weight = conn.weight |
|
conn.weight += 0.1 # Verstärkung der Verbindung |
|
conn.weight = np.clip(conn.weight, 0, 1.0) |
|
logging.info(f"Verstärkte Verbindung für Frage '{question}' in Kategorie '{category}': {old_weight:.4f} -> {conn.weight:.4f}") |
|
``` |
|
|
|
### Verbesserung 2: Erweiterte Hebb'sche Lernregel zur besseren Zuordnung von Fragen |
|
|
|
```python |
|
def enhanced_hebbian_learning(node, target_node, learning_rate=0.2, decay_factor=0.01): |
|
old_weight = None |
|
for conn in node.connections: |
|
if conn.target_node == target_node: |
|
old_weight = conn.weight |
|
conn.weight += learning_rate * node.activation * target_node.activation |
|
conn.weight = np.clip(conn.weight - decay_factor * conn.weight, 0, 1.0) |
|
break |
|
|
|
if old_weight is not None: |
|
logging.info(f"Hebb'sches Lernen angewendet: Gewicht {old_weight:.4f} -> {conn.weight:.4f}") |
|
``` |
|
|
|
### Verbesserung 3: Simulation der Frageverarbeitung im Netzwerk |
|
|
|
```python |
|
def simulate_question_answering(category_nodes, question, questions): |
|
category = next((q['category'] for q in questions if q['question'] == question), None) |
|
if not category: |
|
logging.warning(f"Frage '{question}' nicht gefunden!") |
|
return None |
|
|
|
category_node = next((node for node in category_nodes if node.label == category), None) |
|
if category_node: |
|
propagate_signal(category_node, input_signal=0.9, emotion_weights={}, emotional_state=1.0) |
|
activation = category_node.activation |
|
if activation is None or activation <= 0: |
|
logging.warning(f"Kategorie '{category}' hat eine ungültige Aktivierung: {activation}") |
|
return 0.0 # Rückgabe von 0, falls die Aktivierung fehlschlägt |
|
logging.info(f"Verarbeite Frage: '{question}' → Kategorie: '{category}' mit Aktivierung {activation:.4f}") |
|
return activation # Entfernte doppelte Logging-Ausgabe |
|
else: |
|
logging.warning(f"Kategorie '{category}' nicht im Netzwerk gefunden. Die Kategorie wird neu hinzugefügt!") |
|
return 0.0 |
|
``` |
|
|
|
### Verbesserung 4: Finden der besten passenden Frage zur Benutzeranfrage |
|
|
|
```python |
|
def find_question_by_keyword(questions, keyword): |
|
matching_questions = [q for q in questions if keyword.lower() in q['question'].lower()] |
|
return matching_questions if matching_questions else None |
|
``` |
|
|
|
### Verbesserung 5: Suche nach der ähnlichsten Frage basierend auf einfachen Ähnlichkeitsmetriken |
|
|
|
```python |
|
def find_similar_question(questions, query): |
|
from difflib import get_close_matches |
|
question_texts = [q['question'] for q in questions] |
|
closest_matches = get_close_matches(query, question_texts, n=1, cutoff=0.6) |
|
|
|
if closest_matches: |
|
matched_question = next((q for q in questions if q['question'] == closest_matches[0]), None) |
|
return matched_question |
|
else: |
|
return {"question": "Keine passende Frage gefunden", "category": "Unbekannt"} |
|
``` |
|
|
|
### Verbesserung 6: Testfunktion zur Überprüfung des Modells |
|
|
|
```python |
|
def test_model(category_nodes, questions, query): |
|
matched_question = find_question_by_keyword(questions, query) |
|
if matched_question: |
|
logging.info(f"Gefundene Frage: {matched_question[0]['question']} -> Kategorie: {matched_question[0]['category']}") |
|
simulate_question_answering(category_nodes, matched_question[0]['question'], questions) |
|
else: |
|
logging.warning("Keine passende Frage gefunden.") |
|
|
|
similarity_question = find_similar_question(questions, query) |
|
logging.info(f"Ähnlichste Frage: {similarity_question['question']} -> Kategorie: {similarity_question['category']}") |
|
``` |
|
|
|
### NetworkX-Funktionen für kausale Graphen |
|
|
|
```python |
|
def build_causal_graph(category_nodes): |
|
G = nx.DiGraph() |
|
for node in category_nodes: |
|
G.add_node(node.label) |
|
for conn in node.connections: |
|
G.add_edge(node.label, conn.target_node.label, weight=conn.weight) |
|
return G |
|
|
|
def analyze_causality_multiple(G, num_pairs=3): |
|
if len(G.nodes) < 2: |
|
logging.warning("Graph enthält nicht genügend Knoten für eine Analyse.") |
|
return |
|
|
|
for _ in range(num_pairs): |
|
start_node, target_node = random.sample(G.nodes, 2) |
|
logging.info(f"Analysiere kausale Pfade von '{start_node}' nach '{target_node}'") |
|
|
|
try: |
|
paths = list(nx.all_simple_paths(G, source=start_node, target=target_node)) |
|
if paths: |
|
for path in paths: |
|
logging.info(f"Kausaler Pfad: {' -> '.join(path)}") |
|
else: |
|
logging.info(f"Kein Pfad gefunden von '{start_node}' nach '{target_node}'") |
|
except nx.NetworkXNoPath: |
|
logging.warning(f"Kein direkter Pfad zwischen '{start_node}' und '{target_node}' gefunden.") |
|
|
|
def analyze_node_influence(G): |
|
influence_scores = nx.pagerank(G, alpha=0.85) |
|
sorted_influences = sorted(influence_scores.items(), key=lambda x: x[1], reverse=True) |
|
for node, score in sorted_influences: |
|
logging.info(f"Knoten: {node}, Einfluss: {score:.4f}") |
|
``` |
|
|
|
### Funktion für Interventionen basierend auf Pearl's Do-Operator |
|
|
|
```python |
|
def do_intervention(node, new_value): |
|
logging.info(f"Intervention: Setze {node.label} auf {new_value}") |
|
node.activation = new_value |
|
for conn in node.connections: |
|
conn.target_node.activation += node.activation * conn.weight |
|
``` |
|
|
|
### Kontextabhängiges Lernen verstärken |
|
|
|
```python |
|
def contextual_causal_analysis(node, context_factors, learning_rate=0.1): |
|
context_factor = context_factors.get(node.label, 1.0) |
|
if node.activation > 0.8 and context_factor > 1.0: |
|
logging.info(f"Kausale Beziehung verstärkt für {node.label} aufgrund des Kontextes.") |
|
for conn in node.connections: |
|
conn.weight += learning_rate * context_factor |
|
conn.weight = np.clip(conn.weight, 0, 1.0) |
|
logging.info(f"Gewicht aktualisiert: {node.label} → {conn.target_node.label}, Gewicht: {conn.weight:.4f}") |
|
``` |
|
|
|
### PyTorch-Modell für kausale Inferenz |
|
|
|
```python |
|
class CausalInferenceNN(nn.Module): |
|
def __init__(self): |
|
super(CausalInferenceNN, self).__init__() |
|
self.fc1 = nn.Linear(10, 20) |
|
self.fc2 = nn.Linear(20, 1) |
|
|
|
def forward(self, x): |
|
x = torch.relu(self.fc1(x)) |
|
return self.fc2(x) |
|
``` |
|
|
|
### Debugging-Funktion |
|
|
|
```python |
|
def debug_connections(category_nodes): |
|
start_time = time.time() |
|
for node in category_nodes: |
|
logging.info(f"Knoten: {node.label}") |
|
for conn in node.connections: |
|
logging.info(f" Verbindung zu: {conn.target_node.label}, Gewicht: {conn.weight}") |
|
end_time = time.time() |
|
logging.info(f"debug_connections Ausführungszeit: {end_time - start_time:.4f} Sekunden") |
|
``` |
|
|
|
### Hilfsfunktionen |
|
|
|
```python |
|
def sigmoid(x): |
|
return 1 / (1 + np.exp(-x)) |
|
|
|
def add_activation_noise(activation, noise_level=0.1): |
|
noise = np.random.normal(0, noise_level) |
|
return np.clip(activation + noise, 0.0, 1.0) |
|
|
|
def decay_weights(category_nodes, decay_rate=0.002, forgetting_curve=0.95): |
|
for node in category_nodes: |
|
for conn in node.connections: |
|
conn.weight *= (1 - decay_rate) * forgetting_curve |
|
|
|
def reward_connections(category_nodes, target_category, reward_factor=0.1): |
|
for node in category_nodes: |
|
if node.label == target_category: |
|
for conn in node.connections: |
|
conn.weight += reward_factor |
|
conn.weight = np.clip(conn.weight, 0, 1.0) |
|
|
|
def apply_emotion_weight(activation, category_label, emotion_weights, emotional_state=1.0): |
|
emotion_factor = emotion_weights.get(category_label, 1.0) * emotional_state |
|
return activation * emotion_factor |
|
|
|
def generate_simulated_answers(data, personality_distributions): |
|
simulated_answers = [] |
|
for _, row in data.iterrows(): |
|
category = row['Kategorie'] |
|
mean = personality_distributions.get(category, 0.5) |
|
simulated_answer = np.clip(np.random.normal(mean, 0.2), 0.0, 1.0) |
|
simulated_answers.append(simulated_answer) |
|
return simulated_answers |
|
|
|
def social_influence(category_nodes, social_network, influence_factor=0.1): |
|
for node in category_nodes: |
|
for conn in node.connections: |
|
social_impact = sum([social_network.get(conn.target_node.label, 0)]) * influence_factor |
|
conn.weight += social_impact |
|
conn.weight = np.clip(conn.weight, 0, 1.0) |
|
|
|
def update_emotional_state(emotional_state, emotional_change_rate=0.02): |
|
emotional_state += np.random.normal(0, emotional_change_rate) |
|
return np.clip(emotional_state, 0.7, 1.5) |
|
|
|
def apply_contextual_factors(activation, node, context_factors): |
|
context_factor = context_factors.get(node.label, 1.0) |
|
return activation * context_factor * random.uniform(0.9, 1.1) |
|
|
|
def long_term_memory(category_nodes, long_term_factor=0.01): |
|
for node in category_nodes: |
|
for conn in node.connections: |
|
conn.weight += long_term_factor * conn.weight |
|
conn.weight = np.clip(conn.weight, 0, 1.0) |
|
|
|
def hebbian_learning(node, learning_rate=0.3, weight_limit=1.0, reg_factor=0.005): |
|
for connection in node.connections: |
|
old_weight = connection.weight |
|
connection.weight += learning_rate * node.activation * connection.target_node.activation |
|
connection.weight = np.clip(connection.weight, -weight_limit, weight_limit) |
|
connection.weight -= reg_factor * connection.weight |
|
node.activation_history.append(node.activation) # Aktivierung speichern |
|
connection.target_node.activation_history.append(connection.target_node.activation) |
|
logging.info(f"Hebb'sches Lernen: Gewicht von {old_weight:.4f} auf {connection.weight:.4f} erhöht") |
|
``` |
|
|
|
### Klassen für Netzwerkstruktur |
|
|
|
```python |
|
class Connection: |
|
def __init__(self, target_node, weight=None): |
|
self.target_node = target_node |
|
self.weight = weight if weight is not None else random.uniform(0.1, 1.0) |
|
|
|
class Node: |
|
def __init__(self, label): |
|
self.label = label |
|
self.connections = [] |
|
self.activation = 0.0 |
|
self.activation_history = [] |
|
|
|
def add_connection(self, target_node, weight=None): |
|
self.connections.append(Connection(target_node, weight)) |
|
|
|
def save_state(self): |
|
return { |
|
"label": self.label, |
|
"activation": self.activation, |
|
"activation_history": self.activation_history, |
|
"connections": [{"target": conn.target_node.label, "weight": conn.weight} for conn in self.connections] |
|
} |
|
|
|
@staticmethod |
|
def load_state(state, nodes_dict): |
|
node = Node(state["label"]) |
|
node.activation = state["activation"] |
|
node.activation_history = state["activation_history"] |
|
for conn_state in state["connections"]: |
|
target_node = nodes_dict[conn_state["target"]] |
|
connection = Connection(target_node, conn_state["weight"]) |
|
node.connections.append(connection) |
|
return node |
|
|
|
class MemoryNode(Node): |
|
def __init__(self, label, memory_type="short_term"): |
|
super().__init__(label) |
|
self.memory_type = memory_type |
|
self.retention_time = {"short_term": 5, "mid_term": 20, "long_term": 100}[memory_type] |
|
self.time_in_memory = 0 |
|
|
|
def decay(self, decay_rate, context_factors, emotional_state): |
|
context_factor = context_factors.get(self.label, 1.0) |
|
emotional_factor = emotional_state |
|
for conn in self.connections: |
|
if self.memory_type == "short_term": |
|
conn.weight *= (1 - decay_rate * 2 * context_factor * emotional_factor) |
|
elif self.memory_type == "mid_term": |
|
conn.weight *= (1 - decay_rate * context_factor * emotional_factor) |
|
elif self.memory_type == "long_term": |
|
conn.weight *= (1 - decay_rate * 0.5 * context_factor * emotional_factor) |
|
|
|
def promote(self, activation_threshold=0.7): |
|
if len(self.activation_history) == 0: |
|
return |
|
if self.memory_type == "short_term" and np.mean(self.activation_history[-5:]) > activation_threshold: |
|
self.memory_type = "mid_term" |
|
self.retention_time = 20 |
|
elif self.memory_type == "mid_term" and np.mean(self.activation_history[-20:]) > activation_threshold: |
|
self.memory_type = "long_term" |
|
self.retention_time = 100 |
|
|
|
class CortexCreativus(Node): |
|
def __init__(self, label): |
|
super().__init__(label) |
|
|
|
def generate_new_ideas(self, category_nodes): |
|
new_ideas = [] |
|
for node in category_nodes: |
|
if node.activation > 0.5: |
|
new_idea = f"New idea based on {node.label} with activation {node.activation}" |
|
new_ideas.append(new_idea) |
|
return new_ideas |
|
|
|
class SimulatrixNeuralis(Node): |
|
def __init__(self, label): |
|
super().__init__(label) |
|
|
|
def simulate_scenarios(self, category_nodes): |
|
scenarios = [] |
|
for node in category_nodes: |
|
if node.activation > 0.5: |
|
scenario = f"Simulated scenario based on {node.label} with activation {node.activation}" |
|
scenarios.append(scenario) |
|
return scenarios |
|
|
|
class CortexCriticus(Node): |
|
def __init__(self, label): |
|
super().__init__(label) |
|
|
|
def evaluate_ideas(self, ideas): |
|
evaluated_ideas = [] |
|
for idea in ideas: |
|
evaluation_score = random.uniform(0, 1) |
|
evaluation = f"Evaluated idea: {idea} - Score: {evaluation_score}" |
|
evaluated_ideas.append(evaluation) |
|
return evaluated_ideas |
|
|
|
class LimbusAffectus(Node): |
|
def __init__(self, label): |
|
super().__init__(label) |
|
|
|
def apply_emotion_weight(self, ideas, emotional_state): |
|
weighted_ideas = [] |
|
for idea in ideas: |
|
weighted_idea = f"Emotionally weighted idea: {idea} - Weight: {emotional_state}" |
|
weighted_ideas.append(weighted_idea) |
|
return weighted_ideas |
|
|
|
class MetaCognitio(Node): |
|
def __init__(self, label): |
|
super().__init__(label) |
|
|
|
def optimize_system(self, category_nodes): |
|
for node in category_nodes: |
|
node.activation *= random.uniform(0.9, 1.1) |
|
|
|
class CortexSocialis(Node): |
|
def __init__(self, label): |
|
super().__init__(label) |
|
|
|
def simulate_social_interactions(self, category_nodes): |
|
interactions = [] |
|
for node in category_nodes: |
|
if node.activation > 0.5: |
|
interaction = f"Simulated social interaction based on {node.label} with activation {node.activation}" |
|
interactions.append(interaction) |
|
return interactions |
|
|
|
def connect_new_brains_to_network(category_nodes, new_brains): |
|
for brain in new_brains: |
|
for node in category_nodes: |
|
brain.add_connection(node) |
|
node.add_connection(brain) |
|
``` |
|
|
|
### Netzwerk-Initialisierung |
|
|
|
```python |
|
def initialize_quiz_network(categories): |
|
try: |
|
category_nodes = [Node(c) for c in categories] |
|
for node in category_nodes: |
|
for target_node in category_nodes: |
|
if node != target_node: |
|
node.add_connection(target_node) |
|
logging.debug(f"Verbindung hinzugefügt: {node.label} → {target_node.label}") |
|
debug_connections(category_nodes) |
|
for node in category_nodes: |
|
logging.info(f"Knoten erstellt: {node.label}") |
|
for conn in node.connections: |
|
logging.info(f" → Verbindung zu {conn.target_node.label} mit Gewicht {conn.weight:.4f}") |
|
return category_nodes |
|
except Exception as e: |
|
logging.error(f"Fehler bei der Netzwerk-Initialisierung: {e}") |
|
return [] |
|
``` |
|
|
|
### Signalpropagation |
|
|
|
```python |
|
def propagate_signal(node, input_signal, emotion_weights, emotional_state=1.0, context_factors=None): |
|
node.activation = add_activation_noise(sigmoid(input_signal * random.uniform(0.8, 1.2))) |
|
node.activation_history.append(node.activation) # Aktivierung speichern |
|
node.activation = apply_emotion_weight(node.activation, node.label, emotion_weights, emotional_state) |
|
if context_factors: |
|
node.activation = apply_contextual_factors(node.activation, node, context_factors) |
|
logging.info(f"Signalpropagation für {node.label}: Eingangssignal {input_signal:.4f}") |
|
for connection in node.connections: |
|
logging.info(f" → Signal an {connection.target_node.label} mit Gewicht {connection.weight:.4f}") |
|
connection.target_node.activation += node.activation * connection.weight |
|
|
|
def propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state): |
|
node.activation = add_activation_noise(sigmoid(input_signal)) |
|
node.activation_history.append(node.activation) |
|
for connection in node.connections: |
|
connection.target_node.activation += node.activation * connection.weight |
|
for memory_node in memory_nodes: |
|
memory_node.time_in_memory += 1 |
|
memory_node.promote() |
|
``` |
|
|
|
### Simulation mit Anpassungen |
|
|
|
```python |
|
def simulate_learning(data, category_nodes, personality_distributions, epochs=1, learning_rate=0.8, reward_interval=5, decay_rate=0.002, emotional_state=1.0, context_factors=None): |
|
if context_factors is None: |
|
context_factors = {} |
|
|
|
weights_history = {f"{node.label} → {conn.target_node.label}": [] for node in category_nodes for conn in node.connections} |
|
activation_history = {node.label: [] for node in category_nodes} |
|
question_nodes = [] |
|
|
|
for idx, row in data.iterrows(): |
|
q_node = Node(row['Frage']) |
|
question_nodes.append(q_node) |
|
category_label = row['Kategorie'].strip() |
|
category_node = next((c for c in category_nodes if c.label == category_label), None) |
|
if category_node: |
|
q_node.add_connection(category_node) |
|
logging.debug(f"Verbindung hinzugefügt: {q_node.label} → {category_node.label}") |
|
else: |
|
logging.warning(f"Warnung: Kategorie '{category_label}' nicht gefunden für Frage '{row['Frage']}'.") |
|
|
|
emotion_weights = {category: 1.0 for category in data['Kategorie'].unique()} |
|
social_network = {category: random.uniform(0.1, 1.0) for category in data['Kategorie'].unique()} |
|
|
|
for epoch in range(epochs): |
|
logging.info(f"\n--- Epoche {epoch + 1} ---") |
|
simulated_answers = generate_simulated_answers(data, personality_distributions) |
|
|
|
for node in category_nodes: |
|
node.activation_sum = 0.0 |
|
node.activation_count = 0 |
|
|
|
for node in category_nodes: |
|
propagate_signal(node, random.uniform(0.1, 0.9), emotion_weights, emotional_state, context_factors) |
|
node.activation_history.append(node.activation) # Aktivierung speichern |
|
|
|
for idx, q_node in enumerate(question_nodes): |
|
for node in category_nodes + question_nodes: |
|
node.activation = 0.0 |
|
answer = simulated_answers[idx] |
|
propagate_signal(q_node, answer, emotion_weights, emotional_state, context_factors) |
|
q_node.activation_history.append(q_node.activation) # Aktivierung speichern |
|
hebbian_learning(q_node, learning_rate) |
|
|
|
for node in category_nodes: |
|
node.activation_sum += node.activation |
|
if node.activation > 0: |
|
node.activation_count += 1 |
|
|
|
for node in category_nodes: |
|
for conn in node.connections: |
|
weights_history[f"{node.label} → {conn.target_node.label}"].append(conn.weight) |
|
logging.debug(f"Gewicht aktualisiert: {node.label} → {conn.target_node.label}, Gewicht: {conn.weight}") |
|
|
|
# Kausalitätsverstärkung anwenden |
|
contextual_causal_analysis(q_node, context_factors, learning_rate) |
|
|
|
for node in category_nodes: |
|
if node.activation_count > 0: |
|
mean_activation = node.activation_sum / node.activation_count |
|
activation_history[node.label].append(mean_activation) |
|
logging.info(f"Durchschnittliche Aktivierung für Knoten {node.label}: {mean_activation:.4f}") |
|
else: |
|
activation_history[node.label].append(0.0) |
|
logging.info(f"Knoten {node.label} wurde in dieser Epoche nicht aktiviert.") |
|
|
|
if (epoch + 1) % reward_interval == 0: |
|
target_category = random.choice(data['Kategorie'].unique()) |
|
reward_connections(category_nodes, target_category=target_category) |
|
|
|
decay_weights(category_nodes, decay_rate=decay_rate) |
|
social_influence(category_nodes, social_network) |
|
|
|
logging.info("Simulation abgeschlossen. Ergebnisse werden analysiert...") |
|
return activation_history, weights_history |
|
``` |
|
|
|
### Simulation mit mehrstufigem Gedächtnis |
|
|
|
```python |
|
def simulate_multilevel_memory(data, category_nodes, personality_distributions, epochs=1): |
|
short_term_memory = [MemoryNode(c, "short_term") for c in category_nodes] |
|
mid_term_memory = [] |
|
long_term_memory = [] |
|
memory_nodes = short_term_memory + mid_term_memory + long_term_memory |
|
context_factors = {question: random.uniform(0.9, 1.1) for question in data['Frage'].unique()} |
|
emotional_state = 1.0 |
|
for epoch in range(epochs): |
|
logging.info(f"\n--- Epoche {epoch + 1} ---") |
|
for node in short_term_memory: |
|
input_signal = random.uniform(0.1, 1.0) |
|
propagate_signal_with_memory(node, input_signal, category_nodes, memory_nodes, context_factors, emotional_state) |
|
for memory_node in memory_nodes: |
|
memory_node.decay(decay_rate=0.01, context_factors=context_factors, emotional_state=emotional_state) |
|
for memory_node in memory_nodes: |
|
memory_node.promote() |
|
short_term_memory, mid_term_memory, long_term_memory = update_memory_stages(memory_nodes) |
|
logging.info(f"Epoche {epoch + 1}: Kurzzeit {len(short_term_memory)}, Mittelzeit {len(mid_term_memory)}, Langzeit {len(long_term_memory)}") |
|
return short_term_memory, mid_term_memory, long_term_memory |
|
|
|
def update_memory_stages(memory_nodes): |
|
short_term_memory = [node for node in memory_nodes if node.memory_type == "short_term"] |
|
mid_term_memory = [node for node in memory_nodes if node.memory_type == "mid_term"] |
|
long_term_memory = [node for node in memory_nodes if node.memory_type == "long_term"] |
|
return short_term_memory, mid_term_memory, long_term_memory |
|
``` |
|
|
|
### Plot-Funktionen |
|
|
|
```python |
|
def plot_activation_history(activation_history, filename="activation_history.png"): |
|
if not activation_history: |
|
logging.warning("No activation history to plot") |
|
return |
|
plt.figure(figsize=(12, 8)) |
|
for label, activations in activation_history.items(): |
|
if len(activations) > 0: |
|
plt.plot(range(1, len(activations) + 1), activations, label=label) |
|
plt.title("Entwicklung der Aktivierungen während des Lernens") |
|
plt.xlabel("Epoche") |
|
plt.ylabel("Aktivierung") |
|
plt.legend() |
|
plt.grid(True) |
|
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") |
|
plt.close() |
|
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") |
|
|
|
def plot_dynamics(activation_history, weights_history, filename="dynamics.png"): |
|
if not weights_history: |
|
logging.error("weights_history ist leer.") |
|
return |
|
|
|
plt.figure(figsize=(16, 12)) |
|
plt.subplot(2, 2, 1) |
|
for label, activations in activation_history.items(): |
|
if len(activations) > 0: |
|
plt.plot(range(1, len(activations) + 1), activations, label=label) |
|
plt.title("Entwicklung der Aktivierungen während des Lernens") |
|
plt.xlabel("Epoche") |
|
plt.ylabel("Aktivierung") |
|
plt.legend() |
|
plt.grid(True) |
|
|
|
plt.subplot(2, 2, 2) |
|
for label, weights in weights_history.items(): |
|
if len(weights) > 0: |
|
plt.plot(range(1, len(weights) + 1), weights, label=label, alpha=0.7) |
|
plt.title("Entwicklung der Verbindungsgewichte während des Lernens") |
|
plt.xlabel("Epoche") |
|
plt.ylabel("Gewicht") |
|
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') |
|
plt.grid(True) |
|
|
|
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") |
|
plt.close() |
|
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") |
|
|
|
def plot_memory_distribution(short_term_memory, mid_term_memory, long_term_memory, filename="memory_distribution.png"): |
|
counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)] |
|
labels = ["Kurzfristig", "Mittelfristig", "Langfristig"] |
|
plt.figure(figsize=(8, 6)) |
|
plt.bar(labels, counts, color=["red", "blue", "green"]) |
|
plt.title("Verteilung der Gedächtnisknoten") |
|
plt.ylabel("Anzahl der Knoten") |
|
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") |
|
plt.close() |
|
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") |
|
|
|
def plot_activation_heatmap(activation_history, filename="activation_heatmap.png"): |
|
if not activation_history: |
|
logging.warning("No activation history to plot") |
|
return |
|
|
|
min_length = min(len(activations) for activations in activation_history.values()) |
|
truncated_activations = {key: values[:min_length] for key, values in activation_history.items()} |
|
|
|
plt.figure(figsize=(12, 8)) |
|
heatmap_data = np.array([activations for activations in truncated_activations.values()]) |
|
|
|
if heatmap_data.size == 0: |
|
logging.error("Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.") |
|
return |
|
|
|
sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False) |
|
plt.title("Heatmap der Aktivierungswerte") |
|
plt.xlabel("Kategorie") |
|
plt.ylabel("Epoche") |
|
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") |
|
plt.close() |
|
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") |
|
|
|
def plot_network_topology(category_nodes, new_brains, filename="network_topology.png"): |
|
G = nx.DiGraph() |
|
for node in category_nodes: |
|
G.add_node(node.label) |
|
for conn in node.connections: |
|
G.add_edge(node.label, conn.target_node.label, weight=conn.weight) |
|
for brain in new_brains: |
|
G.add_node(brain.label, color='red') |
|
for conn in brain.connections: |
|
G.add_edge(brain.label, conn.target_node.label, weight=conn.weight) |
|
|
|
pos = nx.spring_layout(G) |
|
edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)} |
|
node_colors = [G.nodes[node].get('color', 'skyblue') for node in G.nodes()] |
|
|
|
nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray") |
|
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels) |
|
plt.title("Netzwerktopologie") |
|
plt.savefig(os.path.join(output_dir, filename), dpi=300, bbox_inches="tight") |
|
plt.close() |
|
logging.info(f"Plot gespeichert unter: {os.path.join(output_dir, filename)}") |
|
``` |
|
|
|
### Modell speichern und laden |
|
|
|
```python |
|
def save_model(category_nodes, filename="model.json"): |
|
model_data = { |
|
"nodes": [node.save_state() for node in category_nodes] |
|
} |
|
with open(filename, "w") as file: |
|
json.dump(model_data, file, indent=4) |
|
logging.info(f"Modell gespeichert in {filename}") |
|
|
|
def save_model_with_questions_and_answers(category_nodes, questions, filename="model_with_qa.json"): |
|
global model_saved |
|
logging.info("Starte Speichern des Modells...") |
|
|
|
# Überprüfen, ob Änderungen vorgenommen wurden |
|
current_model_data = { |
|
"nodes": [node.save_state() for node in category_nodes], |
|
"questions": questions |
|
} |
|
|
|
if os.path.exists(filename): |
|
try: |
|
with open(filename, "r", encoding="utf-8") as file: |
|
existing_model_data = json.load(file) |
|
if existing_model_data == current_model_data: |
|
logging.info("Keine Änderungen erkannt, erneutes Speichern übersprungen.") |
|
return |
|
except Exception as e: |
|
logging.warning(f"Fehler beim Überprüfen des vorhandenen Modells: {e}") |
|
|
|
# Speichern des aktualisierten Modells |
|
try: |
|
with open(filename, "w", encoding="utf-8") as file: |
|
json.dump(current_model_data, file, indent=4) |
|
logging.info(f"Modell erfolgreich gespeichert unter {filename}.") |
|
model_saved = True # Setze auf True nach erfolgreichem Speichern |
|
except Exception as e: |
|
logging.error(f"Fehler beim Speichern des Modells: {e}") |
|
|
|
def load_model_with_questions_and_answers(filename="model_with_qa.json"): |
|
global initialized |
|
if initialized: |
|
logging.info("Modell bereits initialisiert.") |
|
return None, None |
|
|
|
if not os.path.exists(filename): |
|
logging.warning(f"Datei {filename} nicht gefunden. Netzwerk wird initialisiert.") |
|
return None, None |
|
|
|
try: |
|
with open(filename, "r", encoding="utf-8") as file: |
|
model_data = json.load(file) |
|
|
|
nodes_dict = {node_data["label"]: Node(node_data["label"]) for node_data in model_data["nodes"]} |
|
|
|
for node_data in model_data["nodes"]: |
|
node = nodes_dict[node_data["label"]] |
|
node.activation = node_data.get("activation", 0.0) |
|
for conn_state in node_data["connections"]: |
|
target_node = nodes_dict.get(conn_state["target"]) |
|
if target_node: |
|
node.add_connection(target_node, conn_state["weight"]) |
|
|
|
questions = model_data.get("questions", []) |
|
logging.info(f"Modell geladen mit {len(nodes_dict)} Knoten und {len(questions)} Fragen") |
|
initialized = True |
|
return list(nodes_dict.values()), questions |
|
|
|
except json.JSONDecodeError as e: |
|
logging.error(f"Fehler beim Parsen der JSON-Datei: {e}") |
|
return None, None |
|
``` |
|
|
|
### Fragen aktualisieren |
|
|
|
```python |
|
def update_questions_with_answers(filename="model_with_qa.json"): |
|
with open(filename, "r") as file: |
|
model_data = json.load(file) |
|
|
|
for question in model_data["questions"]: |
|
if "answer" not in question: |
|
question["answer"] = input(f"Gib die Antwort für: '{question['question']}': ") |
|
|
|
with open(filename, "w") as file: |
|
json.dump(model_data, file, indent=4) |
|
logging.info(f"Fragen wurden mit Antworten aktualisiert und gespeichert in {filename}") |
|
``` |
|
|
|
### Beste Antwort finden |
|
|
|
```python |
|
def find_best_answer(category_nodes, questions, query): |
|
matched_question = find_similar_question(questions, query) |
|
if matched_question: |
|
logging.info(f"Gefundene Frage: {matched_question['question']} -> Kategorie: {matched_question['category']}") |
|
answer = matched_question.get("answer", "Keine Antwort verfügbar") |
|
logging.info(f"Antwort: {answer}") |
|
return answer |
|
else: |
|
logging.warning("Keine passende Frage gefunden.") |
|
return None |
|
``` |
|
|
|
### Dashboard erstellen |
|
|
|
```python |
|
def create_dashboard(category_nodes, activation_history, short_term_memory, mid_term_memory, long_term_memory): |
|
root = tk.Tk() |
|
root.title("Psyco Dashboard") |
|
|
|
# Anzeige der Aktivierungshistorie |
|
activation_frame = ttk.Frame(root, padding="10") |
|
activation_frame.pack(fill=tk.BOTH, expand=True) |
|
activation_label = ttk.Label(activation_frame, text="Aktivierungshistorie") |
|
activation_label.pack() |
|
if activation_history: |
|
for label, activations in activation_history.items(): |
|
fig, ax = plt.subplots() |
|
ax.plot(range(1, len(activations) + 1), activations) |
|
ax.set_title(label) |
|
canvas = FigureCanvasTkAgg(fig, master=activation_frame) |
|
canvas.draw() |
|
canvas.get_tk_widget().pack() |
|
else: |
|
no_data_label = ttk.Label(activation_frame, text="Keine Aktivierungshistorie verfügbar.") |
|
no_data_label.pack() |
|
|
|
# Anzeige der Gedächtnisverteilung |
|
memory_frame = ttk.Frame(root, padding="10") |
|
memory_frame.pack(fill=tk.BOTH, expand=True) |
|
memory_label = ttk.Label(memory_frame, text="Gedächtnisverteilung") |
|
memory_label.pack() |
|
memory_counts = [len(short_term_memory), len(mid_term_memory), len(long_term_memory)] |
|
labels = ["Kurzfristig", "Mittelfristig", "Langfristig"] |
|
fig, ax = plt.subplots() |
|
ax.bar(labels, memory_counts, color=["red", "blue", "green"]) |
|
ax.set_title("Verteilung der Gedächtnisknoten") |
|
ax.set_ylabel("Anzahl der Knoten") |
|
canvas = FigureCanvasTkAgg(fig, master=memory_frame) |
|
canvas.draw() |
|
canvas.get_tk_widget().pack() |
|
|
|
# Anzeige der Netzwerktopologie |
|
topology_frame = ttk.Frame(root, padding="10") |
|
topology_frame.pack(fill=tk.BOTH, expand=True) |
|
topology_label = ttk.Label(topology_frame, text="Netzwerktopologie") |
|
topology_label.pack() |
|
G = nx.DiGraph() |
|
for node in category_nodes: |
|
G.add_node(node.label) |
|
for conn in node.connections: |
|
G.add_edge(node.label, conn.target_node.label, weight=conn.weight) |
|
pos = nx.spring_layout(G) |
|
edge_labels = {(u, v): d['weight'] for u, v, d in G.edges(data=True)} |
|
node_colors = ['skyblue' for _ in G.nodes()] |
|
fig, ax = plt.subplots() |
|
nx.draw(G, pos, with_labels=True, node_size=3000, node_color=node_colors, font_size=10, font_weight="bold", edge_color="gray", ax=ax) |
|
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, ax=ax) |
|
ax.set_title("Netzwerktopologie") |
|
canvas = FigureCanvasTkAgg(fig, master=topology_frame) |
|
canvas.draw() |
|
canvas.get_tk_widget().pack() |
|
|
|
# Anzeige der Heatmap der Aktivierungswerte |
|
heatmap_frame = ttk.Frame(root, padding="10") |
|
heatmap_frame.pack(fill=tk.BOTH, expand=True) |
|
heatmap_label = ttk.Label(heatmap_frame, text="Heatmap der Aktivierungswerte") |
|
heatmap_label.pack() |
|
if activation_history: |
|
min_length = min(len(activations) for activations in activation_history.values()) |
|
truncated_activations = {key: values[:min_length] for key, values in activation_history.items()} |
|
heatmap_data = np.array([activations for activations in truncated_activations.values()]) |
|
if heatmap_data.size > 0: |
|
fig, ax = plt.subplots() |
|
sns.heatmap(heatmap_data, cmap="YlGnBu", xticklabels=truncated_activations.keys(), yticklabels=False, ax=ax) |
|
ax.set_title("Heatmap der Aktivierungswerte") |
|
ax.set_xlabel("Kategorie") |
|
ax.set_ylabel("Epoche") |
|
canvas = FigureCanvasTkAgg(fig, master=heatmap_frame) |
|
canvas.draw() |
|
canvas.get_tk_widget().pack() |
|
else: |
|
no_data_label = ttk.Label(heatmap_frame, text="Heatmap-Daten sind leer. Überprüfen Sie die Aktivierungshistorie.") |
|
no_data_label.pack() |
|
else: |
|
no_data_label = ttk.Label(heatmap_frame, text="Keine Aktivierungshistorie verfügbar.") |
|
no_data_label.pack() |
|
|
|
root.mainloop() |
|
``` |
|
|
|
### CSV-Datei verarbeiten |
|
|
|
```python |
|
def process_csv_in_chunks(filename, chunk_size=10000): |
|
global category_nodes, questions |
|
logging.info(f"Beginne Verarbeitung der Datei: {filename}") |
|
|
|
try: |
|
# Test, ob die Datei existiert |
|
if not os.path.exists(filename): |
|
logging.error(f"Datei {filename} nicht gefunden.") |
|
return None |
|
|
|
all_chunks = [] |
|
for chunk in pd.read_csv(filename, chunksize=chunk_size, encoding="utf-8", on_bad_lines='skip'): |
|
logging.info(f"Chunk mit {len(chunk)} Zeilen gelesen.") |
|
if 'Frage' not in chunk.columns or 'Kategorie' not in chunk.columns or 'Antwort' not in chunk.columns: |
|
logging.error("CSV-Datei enthält nicht die erwarteten Spalten: 'Frage', 'Kategorie', 'Antwort'") |
|
return None |
|
|
|
all_chunks.append(chunk) |
|
|
|
data = pd.concat(all_chunks, ignore_index=True) |
|
logging.info(f"Alle Chunks erfolgreich verarbeitet. Gesamtzeilen: {len(data)}") |
|
|
|
return data |
|
|
|
except pd.errors.EmptyDataError: |
|
logging.error("CSV-Datei ist leer.") |
|
except pd.errors.ParserError as e: |
|
logging.error(f"Parsing-Fehler in CSV-Datei: {e}") |
|
except Exception as e: |
|
logging.error(f"Unerwarteter Fehler beim Verarbeiten der Datei: {e}") |
|
|
|
return None |
|
``` |
|
|
|
### Einzelne Einträge verarbeiten |
|
|
|
```python |
|
def process_single_entry(question, category, answer): |
|
global category_nodes, questions |
|
|
|
# Sicherstellen, dass die globalen Variablen initialisiert sind |
|
if category_nodes is None: |
|
category_nodes = [] |
|
logging.warning("Kategorie-Knotenliste war None, wurde nun initialisiert.") |
|
|
|
if questions is None: |
|
questions = [] |
|
logging.warning("Fragenliste war None, wurde nun initialisiert.") |
|
|
|
# Überprüfen, ob die Kategorie bereits vorhanden ist |
|
if not any(node.label == category for node in category_nodes): |
|
category_nodes.append(Node(category)) |
|
logging.info(f"Neue Kategorie '{category}' dem Netzwerk hinzugefügt.") |
|
|
|
# Frage, Kategorie und Antwort zur Liste hinzufügen |
|
questions.append({"question": question, "category": category, "answer": answer}) |
|
logging.info(f"Neue Frage hinzugefügt: '{question}' -> Kategorie: '{category}'") |
|
``` |
|
|
|
### CSV-Datei mit Dask verarbeiten |
|
|
|
```python |
|
def process_csv_with_dask(filename, chunk_size=10000): |
|
try: |
|
ddf = dd.read_csv(filename, blocksize=chunk_size) |
|
ddf = ddf.astype({'Kategorie': 'category'}) |
|
|
|
for row in ddf.itertuples(index=False, name=None): |
|
process_single_entry(row[0], row[1], row[2]) |
|
|
|
logging.info("Alle Chunks erfolgreich mit Dask verarbeitet.") |
|
except Exception as e: |
|
logging.error(f"Fehler beim Verarbeiten der Datei mit Dask: {e}") |
|
``` |
|
|
|
### In SQLite speichern |
|
|
|
```python |
|
def save_to_sqlite(filename, db_name="dataset.db"): |
|
conn = sqlite3.connect(db_name) |
|
chunk_iter = pd.read_csv(filename, chunksize=10000) |
|
for chunk in chunk_iter: |
|
chunk.to_sql("qa_data", conn, if_exists="append", index=False) |
|
logging.info(f"Chunk mit {len(chunk)} Zeilen gespeichert.") |
|
conn.close() |
|
logging.info("CSV-Daten wurden erfolgreich in SQLite gespeichert.") |
|
``` |
|
|
|
### Aus SQLite laden |
|
|
|
```python |
|
def load_from_sqlite(db_name="dataset.db"): |
|
conn = sqlite3.connect(db_name) |
|
query = "SELECT Frage, Kategorie, Antwort FROM qa_data" |
|
data = pd.read_sql_query(query, conn) |
|
conn.close() |
|
return data |
|
``` |
|
|
|
### Teilmodell speichern |
|
|
|
```python |
|
def save_partial_model(filename="partial_model.json"): |
|
model_data = { |
|
"nodes": [node.save_state() for node in category_nodes], |
|
"questions": questions |
|
} |
|
with open(filename, "w") as file: |
|
json.dump(model_data, file, indent=4) |
|
logging.info("Teilmodell gespeichert.") |
|
``` |
|
|
|
### CSV-Datei faul laden |
|
|
|
```python |
|
def lazy_load_csv(filename, chunk_size=10000): |
|
for chunk in pd.read_csv(filename, chunksize=chunk_size): |
|
for _, row in chunk.iterrows(): |
|
yield row['Frage'], row['Kategorie'], row['Antwort'] |
|
``` |
|
|
|
### Hauptfunktion |
|
|
|
```python |
|
def main(): |
|
start_time = time.time() |
|
category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json") |
|
|
|
if category_nodes is None: |
|
csv_file = "data.csv" |
|
data = process_csv_in_chunks(csv_file) |
|
if data is None: |
|
logging.error("Fehler beim Laden der CSV-Datei.") |
|
return |
|
|
|
if len(data) > 1000: |
|
logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...") |
|
split_csv(csv_file) |
|
|
|
# Verarbeite jede aufgeteilte Datei |
|
data_dir = "data" |
|
for filename in os.listdir(data_dir): |
|
if filename.endswith(".csv"): |
|
file_path = os.path.join(data_dir, filename) |
|
logging.info(f"Verarbeite Datei: {file_path}") |
|
|
|
data = process_csv_in_chunks(file_path) |
|
if data is None: |
|
logging.error("Fehler beim Laden der CSV-Datei.") |
|
return |
|
|
|
categories = data['Kategorie'].unique() |
|
category_nodes = initialize_quiz_network(categories) |
|
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] |
|
|
|
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} |
|
activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions) |
|
|
|
save_model_with_questions_and_answers(category_nodes, questions) |
|
else: |
|
logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.") |
|
categories = data['Kategorie'].unique() |
|
category_nodes = initialize_quiz_network(categories) |
|
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] |
|
|
|
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} |
|
activation_history, weights_history = simulate_learning(data, category_nodes, personality_distributions) |
|
|
|
save_model_with_questions_and_answers(category_nodes, questions) |
|
|
|
end_time = time.time() |
|
logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden") |
|
``` |
|
|
|
### Simulation aus der GUI starten |
|
|
|
```python |
|
def run_simulation_from_gui(learning_rate, decay_rate, reward_interval, epochs): |
|
global model_saved |
|
model_saved = False # Erzwinge das Speichern nach dem Training |
|
|
|
start_time = time.time() |
|
csv_file = "data.csv" |
|
|
|
category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json") |
|
|
|
if category_nodes is None: |
|
data = process_csv_in_chunks(csv_file) |
|
if not isinstance(data, pd.DataFrame): |
|
logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.") |
|
return |
|
|
|
if len(data) > 1000: |
|
logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...") |
|
split_csv(csv_file) |
|
|
|
# Verarbeite jede aufgeteilte Datei |
|
data_dir = "data" |
|
for filename in os.listdir(data_dir): |
|
if filename.endswith(".csv"): |
|
file_path = os.path.join(data_dir, filename) |
|
logging.info(f"Verarbeite Datei: {file_path}") |
|
|
|
data = process_csv_in_chunks(file_path) |
|
if not isinstance(data, pd.DataFrame): |
|
logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.") |
|
return |
|
|
|
categories = data['Kategorie'].unique() |
|
category_nodes = initialize_quiz_network(categories) |
|
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] |
|
|
|
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} |
|
activation_history, weights_history = simulate_learning( |
|
data, category_nodes, personality_distributions, |
|
epochs=int(epochs), |
|
learning_rate=float(learning_rate), |
|
reward_interval=int(reward_interval), |
|
decay_rate=float(decay_rate) |
|
) |
|
|
|
save_model_with_questions_and_answers(category_nodes, questions) |
|
else: |
|
logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.") |
|
categories = data['Kategorie'].unique() |
|
category_nodes = initialize_quiz_network(categories) |
|
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] |
|
|
|
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} |
|
activation_history, weights_history = simulate_learning( |
|
data, category_nodes, personality_distributions, |
|
epochs=int(epochs), |
|
learning_rate=float(learning_rate), |
|
reward_interval=int(reward_interval), |
|
decay_rate=float(decay_rate) |
|
) |
|
|
|
save_model_with_questions_and_answers(category_nodes, questions) |
|
else: |
|
data = process_csv_in_chunks(csv_file) |
|
if not isinstance(data, pd.DataFrame): |
|
logging.error("Fehler beim Laden der CSV-Datei. Erwarteter DataFrame wurde nicht zurückgegeben.") |
|
return |
|
|
|
logging.info(f"Anzahl der Zeilen in der geladenen CSV: {len(data)}") |
|
|
|
personality_distributions = {category: random.uniform(0.5, 0.8) for category in [node.label for node in category_nodes]} |
|
|
|
activation_history, weights_history = simulate_learning( |
|
data, category_nodes, personality_distributions, |
|
epochs=int(epochs), |
|
learning_rate=float(learning_rate), |
|
reward_interval=int(reward_interval), |
|
decay_rate=float(decay_rate) |
|
) |
|
|
|
save_model_with_questions_and_answers(category_nodes, questions) |
|
|
|
end_time = time.time() |
|
logging.info(f"Simulation abgeschlossen. Gesamtdauer: {end_time - start_time:.2f} Sekunden") |
|
messagebox.showinfo("Ergebnis", f"Simulation abgeschlossen! Dauer: {end_time - start_time:.2f} Sekunden") |
|
``` |
|
|
|
### Netzwerk asynchron initialisieren |
|
|
|
```python |
|
def async_initialize_network(): |
|
global category_nodes, questions, model_saved |
|
logging.info("Starte Initialisierung des Netzwerks...") |
|
|
|
category_nodes, questions = load_model_with_questions_and_answers("model_with_qa.json") |
|
|
|
if category_nodes is None: |
|
category_nodes = [] |
|
logging.warning("Keine gespeicherten Kategorien gefunden. Neues Netzwerk wird erstellt.") |
|
model_saved = False # Zurücksetzen der Speicher-Flagge |
|
|
|
if questions is None: |
|
questions = [] |
|
logging.warning("Keine gespeicherten Fragen gefunden. Neues Fragen-Array wird erstellt.") |
|
model_saved = False # Zurücksetzen der Speicher-Flagge |
|
|
|
if not category_nodes: |
|
csv_file = "data.csv" |
|
data = process_csv_in_chunks(csv_file) |
|
if isinstance(data, pd.DataFrame): |
|
if len(data) > 1000: |
|
logging.info("Datei hat mehr als 1000 Zeilen. Aufteilen in kleinere Dateien...") |
|
split_csv(csv_file) |
|
|
|
# Verarbeite jede aufgeteilte Datei |
|
data_dir = "data" |
|
for filename in os.listdir(data_dir): |
|
if filename.endswith(".csv"): |
|
file_path = os.path.join(data_dir, filename) |
|
logging.info(f"Verarbeite Datei: {file_path}") |
|
|
|
data = process_csv_in_chunks(file_path) |
|
if isinstance(data, pd.DataFrame): |
|
categories = data['Kategorie'].unique() |
|
category_nodes = initialize_quiz_network(categories) |
|
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] |
|
logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.") |
|
model_saved = False # Zurücksetzen der Speicher-Flagge |
|
else: |
|
logging.info("Datei hat weniger als 1000 Zeilen. Keine Aufteilung erforderlich.") |
|
categories = data['Kategorie'].unique() |
|
category_nodes = initialize_quiz_network(categories) |
|
questions = [{"question": row['Frage'], "category": row['Kategorie'], "answer": row['Antwort']} for _, row in data.iterrows()] |
|
logging.info("Netzwerk aus CSV-Daten erfolgreich erstellt.") |
|
model_saved = False # Zurücksetzen der Speicher-Flagge |
|
else: |
|
logging.error("Fehler beim Laden der CSV-Daten. Netzwerk konnte nicht initialisiert werden.") |
|
return |
|
|
|
save_model_with_questions_and_answers(category_nodes, questions) |
|
logging.info("Netzwerk erfolgreich initialisiert.") |
|
``` |
|
|
|
### GUI starten |
|
|
|
```python |
|
def start_gui(): |
|
def start_simulation(): |
|
try: |
|
threading.Thread(target=run_simulation_from_gui, args=(0.8, 0.002, 5, 10), daemon=True).start() |
|
messagebox.showinfo("Info", "Simulation gestartet!") |
|
logging.info("Simulation gestartet") |
|
except Exception as e: |
|
logging.error(f"Fehler beim Start der Simulation: {e}") |
|
messagebox.showerror("Fehler", f"Fehler: {e}") |
|
|
|
root = tk.Tk() |
|
root.title("DRLCogNet GUI") |
|
root.geometry("400x300") |
|
|
|
header_label = tk.Label(root, text="Simulationseinstellungen", font=("Helvetica", 16)) |
|
header_label.pack(pady=10) |
|
|
|
start_button = tk.Button(root, text="Simulation starten", command=start_simulation) |
|
start_button.pack(pady=20) |
|
|
|
root.mainloop() |
|
``` |
|
|
|
### Hauptprogramm |
|
|
|
```python |
|
if __name__ == "__main__": |
|
# Starte die Initialisierung in einem Thread |
|
threading.Thread(target=async_initialize_network, daemon=True).start() |
|
start_gui() |
|
``` |
|
|
|
## Fragen zur Datenbank (SQLite) |
|
|
|
### Wird die Datenbank im Arbeitsspeicher erstellt? |
|
|
|
Ja, die SQLite-Datenbank wird im Arbeitsspeicher erstellt, wenn die Funktion `save_to_sqlite` aufgerufen wird. Diese Funktion erstellt eine SQLite-Datenbankdatei (standardmäßig `dataset.db`), die im Arbeitsspeicher gespeichert wird, wenn Sie sie nicht an einem anderen Ort speichern. |
|
|
|
### Wie wird die Datenbank erstellt? |
|
|
|
Die Datenbank wird erstellt, indem eine Verbindung zur SQLite-Datenbank hergestellt wird. Wenn die Datei `dataset.db` nicht existiert, wird sie erstellt. Anschließend werden die Daten aus der CSV-Datei in Chunks gelesen und in die Tabelle `qa_data` der SQLite-Datenbank gespeichert. |
|
|
|
### Wie werden die Daten in die Datenbank geladen? |
|
|
|
Die Daten werden in Chunks aus der CSV-Datei gelesen und in die Tabelle `qa_data` der SQLite-Datenbank gespeichert. Die Funktion `to_sql` von Pandas wird verwendet, um die Daten in die Datenbank zu schreiben. |
|
|
|
### Wie werden die Daten aus der Datenbank geladen? |
|
|
|
Die Daten werden aus der Datenbank geladen, indem eine Verbindung zur SQLite-Datenbank hergestellt und eine SQL-Abfrage ausgeführt wird, um die Daten aus der Tabelle `qa_data` zu lesen. Die Funktion `read_sql_query` von Pandas wird verwendet, um die Daten in einen Pandas-DataFrame zu laden. |
|
|
|
### Beispielcode zur Verwendung der Datenbank |
|
|
|
```python |
|
# Daten in die Datenbank speichern |
|
save_to_sqlite("data.csv") |
|
|
|
# Daten aus der Datenbank laden |
|
data = load_from_sqlite() |
|
``` |
|
|
|
## Fazit |
|
|
|
Diese Dokumentation bietet eine umfassende Übersicht über den Code und die Verwendung der SQLite-Datenbank zur Speicherung und zum Laden von Daten. Der Code ist modular aufgebaut und ermöglicht die Verarbeitung und Simulation von Daten aus einer CSV-Datei in einem neuronalen Netzwerk. Die SQLite-Datenbank wird im Arbeitsspeicher erstellt und ermöglicht die effiziente Speicherung und das Laden von Daten. |
|
|