""" Matching-Orchestrator: - Erkennt deutsche Hörbuch-Serien (die drei ???, TKKG, ...) - Versucht MusicBrainz → OpenLibrary → Google Books - Lädt Cover herunter - Bewertet Konfidenz und entscheidet über Auto-Accept """ import re import os import logging import httpx import asyncio from pathlib import Path from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from ..config import get_settings from ..models.media_item import LibraryItem, BookFile, Chapter from ..models.library import Library from ..models.session import ServerSetting from ..database import AsyncSessionLocal from .matching.base import MatchResult from .matching.musicbrainz import search_musicbrainz, get_release_details from .matching.open_library import search_open_library, get_work_details from .matching.google_books import search_google_books from .matching.dnb import search_dnb logger = logging.getLogger(__name__) AUTO_ACCEPT_THRESHOLD = 0.75 UNCERTAIN_THRESHOLD = 0.50 # Bekannte deutsche Hörbuch-Serien: (regex, kanonischer Name) SERIES_PATTERNS = [ (r"(?i)^(die drei \?\?\?|die drei fragezeichen|drei fragezeichen)\s*[-–]?\s*(?:folge\s*)?(\d+)", "Die drei ???"), (r"(?i)^(tkkg)\s*[-–]?\s*(?:folge\s*)?(\d+)", "TKKG"), (r"(?i)^(fünf freunde|funf freunde)\s*[-–]?\s*(?:band\s*)?(\d+)", "Fünf Freunde"), (r"(?i)^(bibi blocksberg)\s*[-–]?\s*(?:folge\s*)?(\d+)", "Bibi Blocksberg"), (r"(?i)^(benjamin blümchen|benjamin blumchen)\s*[-–]?\s*(?:folge\s*)?(\d+)", "Benjamin Blümchen"), (r"(?i)^(bibi und tina)\s*[-–]?\s*(?:folge\s*)?(\d+)", "Bibi und Tina"), (r"(?i)^(der kleine vampir)\s*[-–]?\s*(?:band\s*)?(\d+)", "Der kleine Vampir"), # Generisch: "Serie - Folge/Band/Teil N - Titel" (r"(?i)^(.+?)\s*[-–]\s*(?:folge|band|teil|nr\.?|#)\s*(\d+)", None), # Generisch: "Serie (Folge N)" (r"(?i)^(.+?)\s*\((?:folge|band|teil|nr\.?|#|episode)\s*(\d+)\)", None), ] def detect_series(title: str) -> tuple[str | None, str | None]: """Gibt (Serienname, Folgennummer) zurück oder (None, None).""" for pattern, canonical_name in SERIES_PATTERNS: m = re.match(pattern, title.strip()) if m: series = canonical_name or m.group(1).strip() episode = m.group(2) return series, episode return None, None def _title_similarity(a: str, b: str) -> float: """Einfache Ähnlichkeit: Wort-Überlapp.""" if not a or not b: return 0.0 wa = set(re.findall(r'\w+', a.lower())) wb = set(re.findall(r'\w+', b.lower())) if not wa or not wb: return 0.0 return len(wa & wb) / max(len(wa), len(wb)) def _score_result(result: MatchResult, query_title: str, query_author: str | None) -> float: score = result.confidence title_sim = _title_similarity(result.title, query_title) score = score * 0.4 + title_sim * 0.6 if query_author and result.author: author_sim = _title_similarity(result.author, query_author) score = score * 0.7 + author_sim * 0.3 return min(score, 1.0) async def _download_cover(url: str, item_id: str) -> str | None: """Lädt Cover herunter und speichert es lokal.""" settings = get_settings() ext = ".jpg" if ".png" in url: ext = ".png" dest = os.path.join(settings.covers_dir, f"{item_id}{ext}") logger.info(f"Cover-Download: {url}") try: async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: r = await client.get(url) if r.status_code == 200: os.makedirs(settings.covers_dir, exist_ok=True) with open(dest, "wb") as f: f.write(r.content) logger.info(f"Cover gespeichert: {dest} ({len(r.content)} Bytes)") return dest else: logger.warning(f"Cover-Download HTTP {r.status_code}: {url}") except Exception as e: logger.warning(f"Cover-Download Fehler ({url}): {e}") return None async def _apply_match(db: AsyncSession, item: LibraryItem, result: MatchResult, confidence: float): """Schreibt Metadaten aus MatchResult in die DB.""" if result.title: item.title = result.title if result.subtitle and not item.subtitle: item.subtitle = result.subtitle if result.author: item.author = result.author if result.narrator: item.narrator = result.narrator if result.description: item.description = result.description if result.publisher: item.publisher = result.publisher if result.publish_year: item.publish_year = result.publish_year if result.language: item.language = result.language if result.genres: item.genres = result.genres if result.series: item.series = result.series if result.series_sequence: item.series_sequence = result.series_sequence item.matched_source = result.source item.matched_id = result.source_id item.match_confidence = confidence item.updated_at = datetime.utcnow() # Cover herunterladen if result.cover_url and not item.cover_path: cover_path = await _download_cover(result.cover_url, item.id) if cover_path: item.cover_path = cover_path elif not result.cover_url: logger.info(f"Kein Cover-URL in Match-Ergebnis ({result.source}: {result.source_id})") # Kapitel aus MusicBrainz-Tracklisting if result.chapters: from sqlalchemy import delete from ..models.media_item import Chapter await db.execute(delete(Chapter).where(Chapter.library_item_id == item.id)) for idx, ch in enumerate(result.chapters): chapter = Chapter( library_item_id=item.id, chapter_index=idx, title=ch.get("title", f"Kapitel {idx + 1}"), start_seconds=ch.get("start", 0.0), end_seconds=ch.get("end", 0.0), ) db.add(chapter) # zu_prüfen entfernen wenn Konfidenz hoch genug if confidence >= AUTO_ACCEPT_THRESHOLD: tags = item.tags or [] item.tags = [t for t in tags if t != "zu_prüfen"] _SOURCE_FUNCS = { "musicbrainz": (search_musicbrainz, get_release_details), "open_library": (search_open_library, get_work_details), "google_books": (search_google_books, None), "dnb": (search_dnb, None), } async def match_audiobook(item_id: str): """ Haupt-Matching-Funktion. Wird nach dem Scan als Hintergrund-Task gestartet. Quellen und Reihenfolge werden aus den Library-Settings gelesen. """ async with AsyncSessionLocal() as db: result_row = await db.execute(select(LibraryItem).where(LibraryItem.id == item_id)) item = result_row.scalar_one_or_none() if not item or item.match_locked: return # Globale Auto-Match Einstellung prüfen setting = await db.execute( select(ServerSetting).where(ServerSetting.key == "autoMatchBooks") ) s = setting.scalar_one_or_none() if s and s.value is False: return # Matching-Quellen aus Library-Settings lesen lib_row = await db.execute(select(Library).where(Library.id == item.library_id)) lib = lib_row.scalar_one_or_none() sources: list[str] = ( (lib.settings or {}).get("match_sources", list(_SOURCE_FUNCS.keys())) if lib else list(_SOURCE_FUNCS.keys()) ) title = item.title or "" author = item.author series, episode = detect_series(title) search_title = title if series: search_title = f"{series} {episode}" if episode else series if not item.series: item.series = series if not item.series_sequence and episode: item.series_sequence = episode logger.info(f"Matche: '{title}' | Quellen: {sources}") best: MatchResult | None = None best_score = 0.0 for source_name in sources: if best_score >= UNCERTAIN_THRESHOLD: break funcs = _SOURCE_FUNCS.get(source_name) if not funcs: continue search_func, details_func = funcs try: results = await search_func(search_title, author) for r in results: score = _score_result(r, title, author) if score > best_score: best_score = score best = r # Details holen wenn Treffer gut genug (z.B. MB Tracklist) if best and best.source == source_name and best_score >= UNCERTAIN_THRESHOLD and details_func: try: details = await details_func(best.source_id) if details: details.confidence = best_score best = details except Exception as e: logger.warning(f"{source_name} Details Fehler: {e}") except Exception as e: logger.warning(f"{source_name} Fehler: {e}") if best and best_score >= UNCERTAIN_THRESHOLD: try: await _apply_match(db, item, best, best_score) await db.commit() logger.info(f"Match angewendet: '{item.title}' ← {best.source} ({best_score:.2f})") except Exception as e: logger.error(f"_apply_match fehlgeschlagen für '{title}': {e}", exc_info=True) else: logger.info(f"Kein Match für '{title}' (beste Konfidenz: {best_score:.2f})") await db.commit() async def search_for_item(title: str, author: str | None = None) -> list[dict]: """Suche über alle Quellen – für manuelles Matching.""" results = [] async def _search_source(coro): try: return await coro except Exception: return [] mb, ol, gb, dnb = await asyncio.gather( _search_source(search_musicbrainz(title, author)), _search_source(search_open_library(title, author)), _search_source(search_google_books(title, author)), _search_source(search_dnb(title, author)), ) for r in mb + ol + gb + dnb: results.append({ "source": r.source, "id": r.source_id, "title": r.title, "author": r.author, "publishYear": r.publish_year, "cover": r.cover_url, "confidence": r.confidence, }) results.sort(key=lambda x: x["confidence"], reverse=True) return results