Szenario B: Beispiel-Skript für LLM-Match mit Pydantic-Validierung

This commit is contained in:
Markus Grüne 2026-05-15 15:09:46 +02:00
parent bd4abfcfd6
commit d9ea0f88f4
2 changed files with 217 additions and 0 deletions

View file

@ -28,6 +28,7 @@ dwh/ Quelldaten für Szenario C (DWH-Verdichtung)
| `zielschema.sql` | PostgreSQL-DDL | Verbund-Zielschema | | `zielschema.sql` | PostgreSQL-DDL | Verbund-Zielschema |
| `gold_cluster.csv` | CSV | **Goldstandard:** wahre Cluster-Zuordnungen für Auswertung | | `gold_cluster.csv` | CSV | **Goldstandard:** wahre Cluster-Zuordnungen für Auswertung |
| `beispiel_embeddings.py` | Python | Startvorlage für Szenario B: Embeddings mit Ollama + DuckDB-VSS berechnen, speichern, Kandidatenpaare ermitteln | | `beispiel_embeddings.py` | Python | Startvorlage für Szenario B: Embeddings mit Ollama + DuckDB-VSS berechnen, speichern, Kandidatenpaare ermitteln |
| `beispiel_llm_match.py` | Python | Startvorlage für Szenario B: Pydantic-validierte LLM-Entscheidung pro Kandidatenpaar (baut auf der Embedding-Tabelle auf) |
Volumen: **916 Kunden, 600 Behandlungen, 113 Cluster mit Dubletten**. Volumen: **916 Kunden, 600 Behandlungen, 113 Cluster mit Dubletten**.

View file

@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""
Beispiel: LLM-basiertes Matching mit Pydantic-validiertem Output.
Baut auf der Embedding-Tabelle aus `beispiel_embeddings.py` auf.
Holt die n besten Kandidatenpaare ueber Cosine-Similarity und
schickt sie an ein lokales LLM (Ollama). Die Antwort wird ueber
ein Pydantic-Schema validiert, fehlerhafte Antworten werden noch
einmal angefragt (Retry).
Zeigt:
1. Pydantic-Schema fuer typsichere LLM-Antworten.
2. Prompt-Konstruktion mit klarer Anweisung zum JSON-Output.
3. Aufruf von Ollama mit JSON-Mode + Schema.
4. Validierung und Retry-Logik.
5. Persistierung in einer match_entscheidung-Tabelle.
Voraussetzungen:
pip install duckdb ollama pydantic
ollama pull qwen2.5:7b-instruct # Match-Modell (~4 GB)
python beispiel_embeddings.py # vorher laufen lassen
python beispiel_llm_match.py
"""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Literal
import duckdb
import ollama
from pydantic import BaseModel, Field, ValidationError
HERE = Path(__file__).resolve().parent
DB = HERE / "embedding_demo.duckdb"
MODEL = "qwen2.5:7b-instruct"
THRESHOLD_SIM = 0.75 # Cosine-Similarity-Cutoff fuer Kandidatensuche
MAX_PAARE = 20 # Demo: nur die ersten 20 Paare bewerten
MAX_RETRIES = 2 # bei kaputtem JSON erneut fragen
TEMPERATURE = 0.0 # deterministische Antworten
# ============================================================
# 1. Pydantic-Schema fuer die LLM-Antwort
# ============================================================
class MatchEntscheidung(BaseModel):
"""Strukturierte Antwort des LLM zu einem Kandidatenpaar."""
is_duplicate: bool = Field(
description="True, wenn beide Datensaetze dieselbe Person beschreiben."
)
confidence: float = Field(
ge=0.0, le=1.0,
description="Sicherheit der Einschaetzung zwischen 0.0 und 1.0.",
)
reasoning: str = Field(
min_length=10, max_length=400,
description="1-2 Saetze Begruendung, welches Merkmal entschieden hat.",
)
decisive_signal: Literal["name", "address", "phone", "email", "combined"] = Field(
description="Das ausschlaggebende Merkmal fuer die Entscheidung.",
)
# ============================================================
# 2. Prompt-Konstruktion
# ============================================================
SYSTEM_PROMPT = """Du bist ein Datenintegrator. Du bekommst zwei Kundendatensaetze
aus unterschiedlichen Praxen und entscheidest, ob es sich um
dieselbe reale Person handelt.
Achte besonders auf:
- Namens-Varianten (Initialen, abgekuerzte Vornamen, Reihenfolge)
- Adress-Varianten (Strasse/Str., Schreibweise der PLZ)
- Telefon und E-Mail als starke Signale
- Plausibilitaet als Ganzes
Antworte ausschliesslich als JSON nach dem vorgegebenen Schema.
Begruende kurz, welches Merkmal entscheidend war."""
def build_user_prompt(a_text: str, b_text: str) -> str:
return f"Datensatz A: {a_text}\nDatensatz B: {b_text}"
# ============================================================
# 3. LLM-Aufruf mit Pydantic-Validierung und Retry
# ============================================================
def klassifiziere(a_text: str, b_text: str) -> MatchEntscheidung:
"""Fragt das LLM und gibt eine validierte MatchEntscheidung zurueck."""
schema = MatchEntscheidung.model_json_schema()
user_msg = build_user_prompt(a_text, b_text)
last_err: Exception | None = None
for versuch in range(1, MAX_RETRIES + 2):
try:
resp = ollama.chat(
model=MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
format=schema, # JSON-Schema-Mode
options={"temperature": TEMPERATURE},
)
return MatchEntscheidung.model_validate_json(resp["message"]["content"])
except (ValidationError, json.JSONDecodeError) as exc:
last_err = exc
print(f" [Versuch {versuch}] LLM-Antwort ungueltig: {exc}")
continue
raise RuntimeError(f"LLM lieferte nach {MAX_RETRIES + 1} Versuchen kein gueltiges JSON: {last_err}")
# ============================================================
# 4. Pipeline-Lauf
# ============================================================
def main() -> int:
con = duckdb.connect(str(DB))
con.execute("INSTALL vss; LOAD vss;")
con.execute("CREATE SCHEMA IF NOT EXISTS embeddings;")
# Pruefen, ob die Embedding-Tabelle existiert (Output des Vorgaenger-Skripts)
tabellen = con.execute(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'embeddings'"
).fetchall()
if ("kunde_embedding",) not in tabellen:
print("Fehler: embeddings.kunde_embedding fehlt.")
print("Bitte zuerst `python beispiel_embeddings.py` ausfuehren.")
return 1
# Ergebnis-Tabelle anlegen
con.execute("""
CREATE OR REPLACE TABLE embeddings.match_entscheidung (
a_id VARCHAR,
b_id VARCHAR,
sim FLOAT,
is_duplicate BOOLEAN,
confidence FLOAT,
reasoning VARCHAR,
decisive_signal VARCHAR,
modell VARCHAR,
entschieden_am TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# Top-N Kandidatenpaare ueber Cosine-Similarity
paare = con.execute(f"""
SELECT a.quell_id AS a_id, b.quell_id AS b_id,
a.text AS a_text, b.text AS b_text,
array_cosine_similarity(a.embedding, b.embedding) AS sim
FROM embeddings.kunde_embedding a
JOIN embeddings.kunde_embedding b ON a.quell_id < b.quell_id
WHERE array_cosine_similarity(a.embedding, b.embedding) >= {THRESHOLD_SIM}
ORDER BY sim DESC
LIMIT {MAX_PAARE}
""").fetchall()
if not paare:
print(f"Keine Kandidatenpaare mit sim >= {THRESHOLD_SIM} gefunden.")
print("Tipp: beispiel_embeddings.py mit mehr Praxen laufen lassen,")
print("damit Dubletten ueber Praxen hinweg entstehen.")
return 0
print(f"Bewerte {len(paare)} Kandidatenpaare mit {MODEL} ...\n")
t0 = time.time()
for a_id, b_id, a_text, b_text, sim in paare:
try:
entscheidung = klassifiziere(a_text, b_text)
except RuntimeError as e:
print(f" {a_id} vs {b_id}: SKIP ({e})")
continue
con.execute(
"INSERT INTO embeddings.match_entscheidung "
"(a_id, b_id, sim, is_duplicate, confidence, reasoning, "
" decisive_signal, modell) VALUES (?,?,?,?,?,?,?,?)",
[a_id, b_id, sim,
entscheidung.is_duplicate,
entscheidung.confidence,
entscheidung.reasoning,
entscheidung.decisive_signal,
MODEL],
)
mark = "MATCH " if entscheidung.is_duplicate else " "
print(f" {mark} {a_id} vs {b_id} sim={sim:.3f} "
f"conf={entscheidung.confidence:.2f} "
f"signal={entscheidung.decisive_signal}")
print(f" {entscheidung.reasoning}")
dt = time.time() - t0
print(f"\nFertig in {dt:.1f}s ({dt / len(paare):.1f}s pro Paar).")
# Zusammenfassung
summary = con.execute("""
SELECT
COUNT(*) AS gesamt,
SUM(CASE WHEN is_duplicate THEN 1 ELSE 0 END) AS matches,
ROUND(AVG(confidence), 2) AS conf_avg
FROM embeddings.match_entscheidung
""").fetchone()
print(f" Gesamt: {summary[0]}, Matches: {summary[1]}, "
f"durchschn.\\ Confidence: {summary[2]}")
return 0
if __name__ == "__main__":
raise SystemExit(main())