From d9ea0f88f4cc46c798bf573d03ffbbee6d22d49a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Gr=C3=BCne?= Date: Fri, 15 May 2026 15:09:46 +0200 Subject: [PATCH] =?UTF-8?q?Szenario=20B:=20Beispiel-Skript=20f=C3=BCr=20LL?= =?UTF-8?q?M-Match=20mit=20Pydantic-Validierung?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + verbund/beispiel_llm_match.py | 216 ++++++++++++++++++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 verbund/beispiel_llm_match.py diff --git a/README.md b/README.md index e2f282a..4187b16 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ dwh/ Quelldaten für Szenario C (DWH-Verdichtung) | `zielschema.sql` | PostgreSQL-DDL | Verbund-Zielschema | | `gold_cluster.csv` | CSV | **Goldstandard:** wahre Cluster-Zuordnungen für Auswertung | | `beispiel_embeddings.py` | Python | Startvorlage für Szenario B: Embeddings mit Ollama + DuckDB-VSS berechnen, speichern, Kandidatenpaare ermitteln | +| `beispiel_llm_match.py` | Python | Startvorlage für Szenario B: Pydantic-validierte LLM-Entscheidung pro Kandidatenpaar (baut auf der Embedding-Tabelle auf) | Volumen: **916 Kunden, 600 Behandlungen, 113 Cluster mit Dubletten**. diff --git a/verbund/beispiel_llm_match.py b/verbund/beispiel_llm_match.py new file mode 100644 index 0000000..47e7167 --- /dev/null +++ b/verbund/beispiel_llm_match.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +""" +Beispiel: LLM-basiertes Matching mit Pydantic-validiertem Output. + +Baut auf der Embedding-Tabelle aus `beispiel_embeddings.py` auf. +Holt die n besten Kandidatenpaare ueber Cosine-Similarity und +schickt sie an ein lokales LLM (Ollama). Die Antwort wird ueber +ein Pydantic-Schema validiert, fehlerhafte Antworten werden noch +einmal angefragt (Retry). + +Zeigt: + + 1. Pydantic-Schema fuer typsichere LLM-Antworten. + 2. Prompt-Konstruktion mit klarer Anweisung zum JSON-Output. + 3. Aufruf von Ollama mit JSON-Mode + Schema. + 4. Validierung und Retry-Logik. + 5. Persistierung in einer match_entscheidung-Tabelle. + +Voraussetzungen: + + pip install duckdb ollama pydantic + ollama pull qwen2.5:7b-instruct # Match-Modell (~4 GB) + python beispiel_embeddings.py # vorher laufen lassen + python beispiel_llm_match.py +""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Literal + +import duckdb +import ollama +from pydantic import BaseModel, Field, ValidationError + +HERE = Path(__file__).resolve().parent +DB = HERE / "embedding_demo.duckdb" +MODEL = "qwen2.5:7b-instruct" +THRESHOLD_SIM = 0.75 # Cosine-Similarity-Cutoff fuer Kandidatensuche +MAX_PAARE = 20 # Demo: nur die ersten 20 Paare bewerten +MAX_RETRIES = 2 # bei kaputtem JSON erneut fragen +TEMPERATURE = 0.0 # deterministische Antworten + + +# ============================================================ +# 1. Pydantic-Schema fuer die LLM-Antwort +# ============================================================ + +class MatchEntscheidung(BaseModel): + """Strukturierte Antwort des LLM zu einem Kandidatenpaar.""" + + is_duplicate: bool = Field( + description="True, wenn beide Datensaetze dieselbe Person beschreiben." + ) + confidence: float = Field( + ge=0.0, le=1.0, + description="Sicherheit der Einschaetzung zwischen 0.0 und 1.0.", + ) + reasoning: str = Field( + min_length=10, max_length=400, + description="1-2 Saetze Begruendung, welches Merkmal entschieden hat.", + ) + decisive_signal: Literal["name", "address", "phone", "email", "combined"] = Field( + description="Das ausschlaggebende Merkmal fuer die Entscheidung.", + ) + + +# ============================================================ +# 2. Prompt-Konstruktion +# ============================================================ + +SYSTEM_PROMPT = """Du bist ein Datenintegrator. Du bekommst zwei Kundendatensaetze +aus unterschiedlichen Praxen und entscheidest, ob es sich um +dieselbe reale Person handelt. + +Achte besonders auf: + - Namens-Varianten (Initialen, abgekuerzte Vornamen, Reihenfolge) + - Adress-Varianten (Strasse/Str., Schreibweise der PLZ) + - Telefon und E-Mail als starke Signale + - Plausibilitaet als Ganzes + +Antworte ausschliesslich als JSON nach dem vorgegebenen Schema. +Begruende kurz, welches Merkmal entscheidend war.""" + + +def build_user_prompt(a_text: str, b_text: str) -> str: + return f"Datensatz A: {a_text}\nDatensatz B: {b_text}" + + +# ============================================================ +# 3. LLM-Aufruf mit Pydantic-Validierung und Retry +# ============================================================ + +def klassifiziere(a_text: str, b_text: str) -> MatchEntscheidung: + """Fragt das LLM und gibt eine validierte MatchEntscheidung zurueck.""" + schema = MatchEntscheidung.model_json_schema() + user_msg = build_user_prompt(a_text, b_text) + + last_err: Exception | None = None + for versuch in range(1, MAX_RETRIES + 2): + try: + resp = ollama.chat( + model=MODEL, + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_msg}, + ], + format=schema, # JSON-Schema-Mode + options={"temperature": TEMPERATURE}, + ) + return MatchEntscheidung.model_validate_json(resp["message"]["content"]) + except (ValidationError, json.JSONDecodeError) as exc: + last_err = exc + print(f" [Versuch {versuch}] LLM-Antwort ungueltig: {exc}") + continue + raise RuntimeError(f"LLM lieferte nach {MAX_RETRIES + 1} Versuchen kein gueltiges JSON: {last_err}") + + +# ============================================================ +# 4. Pipeline-Lauf +# ============================================================ + +def main() -> int: + con = duckdb.connect(str(DB)) + con.execute("INSTALL vss; LOAD vss;") + con.execute("CREATE SCHEMA IF NOT EXISTS embeddings;") + + # Pruefen, ob die Embedding-Tabelle existiert (Output des Vorgaenger-Skripts) + tabellen = con.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'embeddings'" + ).fetchall() + if ("kunde_embedding",) not in tabellen: + print("Fehler: embeddings.kunde_embedding fehlt.") + print("Bitte zuerst `python beispiel_embeddings.py` ausfuehren.") + return 1 + + # Ergebnis-Tabelle anlegen + con.execute(""" + CREATE OR REPLACE TABLE embeddings.match_entscheidung ( + a_id VARCHAR, + b_id VARCHAR, + sim FLOAT, + is_duplicate BOOLEAN, + confidence FLOAT, + reasoning VARCHAR, + decisive_signal VARCHAR, + modell VARCHAR, + entschieden_am TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + """) + + # Top-N Kandidatenpaare ueber Cosine-Similarity + paare = con.execute(f""" + SELECT a.quell_id AS a_id, b.quell_id AS b_id, + a.text AS a_text, b.text AS b_text, + array_cosine_similarity(a.embedding, b.embedding) AS sim + FROM embeddings.kunde_embedding a + JOIN embeddings.kunde_embedding b ON a.quell_id < b.quell_id + WHERE array_cosine_similarity(a.embedding, b.embedding) >= {THRESHOLD_SIM} + ORDER BY sim DESC + LIMIT {MAX_PAARE} + """).fetchall() + + if not paare: + print(f"Keine Kandidatenpaare mit sim >= {THRESHOLD_SIM} gefunden.") + print("Tipp: beispiel_embeddings.py mit mehr Praxen laufen lassen,") + print("damit Dubletten ueber Praxen hinweg entstehen.") + return 0 + + print(f"Bewerte {len(paare)} Kandidatenpaare mit {MODEL} ...\n") + t0 = time.time() + for a_id, b_id, a_text, b_text, sim in paare: + try: + entscheidung = klassifiziere(a_text, b_text) + except RuntimeError as e: + print(f" {a_id} vs {b_id}: SKIP ({e})") + continue + + con.execute( + "INSERT INTO embeddings.match_entscheidung " + "(a_id, b_id, sim, is_duplicate, confidence, reasoning, " + " decisive_signal, modell) VALUES (?,?,?,?,?,?,?,?)", + [a_id, b_id, sim, + entscheidung.is_duplicate, + entscheidung.confidence, + entscheidung.reasoning, + entscheidung.decisive_signal, + MODEL], + ) + + mark = "MATCH " if entscheidung.is_duplicate else " " + print(f" {mark} {a_id} vs {b_id} sim={sim:.3f} " + f"conf={entscheidung.confidence:.2f} " + f"signal={entscheidung.decisive_signal}") + print(f" {entscheidung.reasoning}") + + dt = time.time() - t0 + print(f"\nFertig in {dt:.1f}s ({dt / len(paare):.1f}s pro Paar).") + + # Zusammenfassung + summary = con.execute(""" + SELECT + COUNT(*) AS gesamt, + SUM(CASE WHEN is_duplicate THEN 1 ELSE 0 END) AS matches, + ROUND(AVG(confidence), 2) AS conf_avg + FROM embeddings.match_entscheidung + """).fetchone() + print(f" Gesamt: {summary[0]}, Matches: {summary[1]}, " + f"durchschn.\\ Confidence: {summary[2]}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())