Files
Audiolib/backend/app/services/matcher.py
Audiolib edf057f36e Cleanup UI buttons + smarter series-aware search title
UI: Hide developer tools (Cover-aus-Datei, Tags-lesen,
Connectivity-Check) behind a '+ Tools' toggle. Default view has only
Play, Match, Auto-Match. Tag extraction runs automatically on scan
anyway, so the buttons were noise.

Matcher: Item metadata (series, author from tags or earlier matches)
now flows into the search:
- detect_series() also scans inside title, not only prefix — handles
  garbage chars (◆ U+25C6 etc.) before the series name
- New _strip_series_prefix removes "Die drei ???" from search title so
  APIs see only the episode title ("Die Villa der Toten") which is how
  most databases index these
- _build_search_title also strips non-printable / exotic chars and
  bracketed content anywhere (not just trailing)
- Effective series falls back to item.series when detect_series misses
- Search call now logs which series the search is using

Example: title='◆Die◆ drei ??? Die Villa der Toten (drei Fragezeichen)'
  detected_series='Die drei ???', search_title='Die Villa der Toten'

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 20:31:40 +02:00

399 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Matching-Orchestrator:
- Erkennt deutsche Hörbuch-Serien (die drei ???, TKKG, ...)
- Versucht MusicBrainz → OpenLibrary → Google Books → DNB
- Lädt Cover herunter
- Bewertet Konfidenz und entscheidet über Auto-Accept
"""
import re
import os
import logging
import httpx
import asyncio
from pathlib import Path
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..config import get_settings
from ..models.media_item import LibraryItem, BookFile, Chapter
from ..models.library import Library
from ..models.session import ServerSetting
from ..database import AsyncSessionLocal
from .matching.base import MatchResult
from .matching.musicbrainz import search_musicbrainz, get_release_details
from .matching.open_library import search_open_library, get_work_details
from .matching.google_books import search_google_books
from .matching.dnb import search_dnb
logger = logging.getLogger(__name__)
AUTO_ACCEPT_THRESHOLD = 0.65
UNCERTAIN_THRESHOLD = 0.40
# Mit Folgenummer
SERIES_PATTERNS_WITH_EPISODE = [
(r"(?i)^(die drei \?\?\?|die drei fragezeichen|drei fragezeichen)\s*[-]?\s*(?:folge\s*)?(\d+)", "Die drei ???"),
(r"(?i)^(tkkg)\s*[-]?\s*(?:folge\s*)?(\d+)", "TKKG"),
(r"(?i)^(fünf freunde|funf freunde)\s*[-]?\s*(?:band\s*)?(\d+)", "Fünf Freunde"),
(r"(?i)^(bibi blocksberg)\s*[-]?\s*(?:folge\s*)?(\d+)", "Bibi Blocksberg"),
(r"(?i)^(benjamin blümchen|benjamin blumchen)\s*[-]?\s*(?:folge\s*)?(\d+)", "Benjamin Blümchen"),
(r"(?i)^(bibi und tina)\s*[-]?\s*(?:folge\s*)?(\d+)", "Bibi und Tina"),
(r"(?i)^(der kleine vampir)\s*[-]?\s*(?:band\s*)?(\d+)", "Der kleine Vampir"),
(r"(?i)^(.+?)\s*[-]\s*(?:folge|band|teil|nr\.?|#)\s*(\d+)", None),
(r"(?i)^(.+?)\s*\((?:folge|band|teil|nr\.?|#|episode)\s*(\d+)\)", None),
]
# Ohne Folgenummer (nur Serie erkennen)
SERIES_PATTERNS_SERIES_ONLY = [
(r"(?i)^(die drei \?\?\?|die drei fragezeichen|drei fragezeichen)", "Die drei ???"),
(r"(?i)^(tkkg)\b", "TKKG"),
(r"(?i)^(fünf freunde|funf freunde)", "Fünf Freunde"),
(r"(?i)^(bibi blocksberg)", "Bibi Blocksberg"),
(r"(?i)^(benjamin blümchen|benjamin blumchen)", "Benjamin Blümchen"),
(r"(?i)^(bibi und tina)", "Bibi und Tina"),
(r"(?i)^(der kleine vampir)", "Der kleine Vampir"),
]
def detect_series(title: str) -> tuple[str | None, str | None]:
t = title.strip()
# 1. Mit Folgenummer am Anfang
for pattern, canonical_name in SERIES_PATTERNS_WITH_EPISODE:
m = re.match(pattern, t)
if m:
return (canonical_name or m.group(1).strip(), m.group(2))
# 2. Ohne Folgenummer am Anfang
for pattern, canonical_name in SERIES_PATTERNS_SERIES_ONLY:
m = re.match(pattern, t)
if m:
return (canonical_name or m.group(1).strip(), None)
# 3. Series-Name irgendwo im Titel (falls Sonderzeichen / Müll davor)
for pattern, canonical_name in SERIES_PATTERNS_SERIES_ONLY:
if not canonical_name:
continue
m = re.search(pattern, t)
if m:
return (canonical_name, None)
return None, None
def _build_search_title(original: str) -> str:
"""Bereinigt Titel: ???, Sonderzeichen, Folge-N-Prefix, Klammer-Inhalte raus."""
t = original
# Nicht-druckbare/exotische Zeichen raus (◆, ◇, U+FFFD etc.)
t = re.sub(r"[^\w\s\-:!?,.&'äöüÄÖÜß]", " ", t, flags=re.UNICODE)
# ??? entfernen (CQL-Wildcard-Problem)
t = re.sub(r"\?{2,}", "", t)
# Klammer-Inhalte (egal wo) entfernen
t = re.sub(r"\([^)]*\)", " ", t)
# "Folge 123" Prefixe und Infixes
t = re.sub(r"(?i)^\s*(?:folge|band|teil|episode|nr\.?|#)\s*\d+\s*[-:\.]*\s*", "", t)
t = re.sub(r"(?i)\b(?:folge|band|teil|episode|nr\.?|#)\s*\d+\b\s*[-:\.]*\s*", " ", t)
# Bindestriche/Unterstriche
t = re.sub(r"[_\-]+", " ", t)
t = re.sub(r"\s+", " ", t).strip()
return t
def _strip_series_prefix(search_title: str, series: str) -> str:
"""Entfernt den Serien-Namen aus dem Suchtitel — nur Episode bleibt."""
if not series:
return search_title
series_clean = _build_search_title(series).lower()
base_lower = search_title.lower()
if series_clean and base_lower.startswith(series_clean):
rest = search_title[len(series_clean):].strip(" -:.")
if rest:
return rest
# auch mittendrin entfernen
pattern = re.escape(series_clean)
rest = re.sub(pattern, " ", search_title, flags=re.IGNORECASE)
rest = re.sub(r"\s+", " ", rest).strip(" -:.")
return rest if rest else search_title
def _title_similarity(a: str, b: str) -> float:
"""Wort-Überlapp mit Min/Max-Gewichtung — lenient für Teil-Treffer."""
if not a or not b:
return 0.0
wa = set(re.findall(r"\w+", a.lower()))
wb = set(re.findall(r"\w+", b.lower()))
if not wa or not wb:
return 0.0
intersect = len(wa & wb)
if intersect == 0:
return 0.0
smaller = min(len(wa), len(wb))
larger = max(len(wa), len(wb))
return 0.7 * (intersect / smaller) + 0.3 * (intersect / larger)
def _score_result(result: MatchResult, query_title: str, query_author: str | None) -> float:
score = result.confidence
title_sim = _title_similarity(result.title, query_title)
score = score * 0.3 + title_sim * 0.7
if query_author and result.author:
author_sim = _title_similarity(result.author, query_author)
score = score * 0.7 + author_sim * 0.3
return min(score, 1.0)
def _enrich_match(best: MatchResult, details: MatchResult) -> MatchResult:
"""Befüllt leere Felder in best mit Werten aus details. Beschreibung/Kapitel werden bevorzugt aus details übernommen."""
if details.description:
best.description = details.description
if details.chapters and not best.chapters:
best.chapters = details.chapters
for attr in (
"subtitle", "narrator", "cover_url", "publisher",
"publish_year", "series", "series_sequence", "language",
):
val = getattr(details, attr, None)
if val and not getattr(best, attr, None):
setattr(best, attr, val)
if details.genres:
existing = set(best.genres or [])
best.genres = (best.genres or []) + [g for g in details.genres if g not in existing]
return best
async def _download_cover(url: str, item_id: str) -> str | None:
settings = get_settings()
ext = ".jpg"
if ".png" in url.lower():
ext = ".png"
dest = os.path.join(settings.covers_dir, f"{item_id}{ext}")
logger.info(f"Cover-Download: {url}")
try:
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client:
r = await client.get(url)
if r.status_code == 200 and len(r.content) > 1000:
os.makedirs(settings.covers_dir, exist_ok=True)
with open(dest, "wb") as f:
f.write(r.content)
logger.info(f"Cover gespeichert: {dest} ({len(r.content)} Bytes)")
return dest
else:
logger.warning(f"Cover-Download HTTP {r.status_code}, size={len(r.content)}: {url}")
except Exception as e:
logger.warning(f"Cover-Download Fehler ({url}): {e}")
return None
async def _apply_match(db: AsyncSession, item: LibraryItem, result: MatchResult, confidence: float):
logger.info(
f"Apply match: item={item.id} title={result.title!r} author={result.author!r} "
f"narrator={result.narrator!r} publisher={result.publisher!r} year={result.publish_year} "
f"series={result.series!r}/{result.series_sequence} cover={bool(result.cover_url)} "
f"chapters={len(result.chapters or [])} confidence={confidence:.2f}"
)
if result.title:
item.title = result.title
if result.subtitle and not item.subtitle:
item.subtitle = result.subtitle
if result.author:
item.author = result.author
if result.narrator:
item.narrator = result.narrator
if result.description:
item.description = result.description
if result.publisher:
item.publisher = result.publisher
if result.publish_year:
item.publish_year = result.publish_year
if result.language:
item.language = result.language
if result.genres:
item.genres = result.genres
if result.series:
item.series = result.series
if result.series_sequence:
item.series_sequence = result.series_sequence
item.matched_source = result.source
item.matched_id = result.source_id
item.match_confidence = confidence
item.updated_at = datetime.utcnow()
if result.cover_url and not item.cover_path:
cover_path = await _download_cover(result.cover_url, item.id)
if cover_path:
item.cover_path = cover_path
elif not result.cover_url:
logger.info(f"Kein Cover-URL in Match-Ergebnis ({result.source}: {result.source_id})")
if result.chapters:
from sqlalchemy import delete
await db.execute(delete(Chapter).where(Chapter.library_item_id == item.id))
for idx, ch in enumerate(result.chapters):
chapter = Chapter(
library_item_id=item.id,
chapter_index=idx,
title=ch.get("title", f"Kapitel {idx + 1}"),
start_seconds=ch.get("start", 0.0),
end_seconds=ch.get("end", 0.0),
)
db.add(chapter)
if confidence >= AUTO_ACCEPT_THRESHOLD:
tags = item.tags or []
item.tags = [t for t in tags if t != "zu_prüfen"]
_SOURCE_FUNCS = {
"musicbrainz": (search_musicbrainz, get_release_details),
"open_library": (search_open_library, get_work_details),
"google_books": (search_google_books, None),
"dnb": (search_dnb, None),
}
async def match_audiobook(item_id: str):
async with AsyncSessionLocal() as db:
result_row = await db.execute(select(LibraryItem).where(LibraryItem.id == item_id))
item = result_row.scalar_one_or_none()
if not item or item.match_locked:
return
setting = await db.execute(
select(ServerSetting).where(ServerSetting.key == "autoMatchBooks")
)
s = setting.scalar_one_or_none()
if s and s.value is False:
return
lib_row = await db.execute(select(Library).where(Library.id == item.library_id))
lib = lib_row.scalar_one_or_none()
sources: list[str] = (
(lib.settings or {}).get("match_sources", list(_SOURCE_FUNCS.keys()))
if lib else list(_SOURCE_FUNCS.keys())
)
title = item.title or ""
author = item.author
detected_series, episode = detect_series(title)
effective_series = detected_series or item.series
if effective_series and episode:
search_title = f"{effective_series} {episode}"
elif effective_series:
# Series aus Titel entfernen → nur Episode-Teil suchen, präziser
cleaned = _build_search_title(title)
search_title = _strip_series_prefix(cleaned, effective_series)
else:
search_title = _build_search_title(title)
if detected_series and not item.series:
item.series = detected_series
if episode and not item.series_sequence:
item.series_sequence = episode
logger.info(
f"Matche: orig='{title}' suchTitel='{search_title}' "
f"author={author!r} series={effective_series!r} | Quellen: {sources}"
)
best: MatchResult | None = None
best_score = 0.0
for source_name in sources:
if best_score >= AUTO_ACCEPT_THRESHOLD:
break
funcs = _SOURCE_FUNCS.get(source_name)
if not funcs:
continue
search_func, details_func = funcs
try:
results = await search_func(search_title, author)
logger.info(f"{source_name}: {len(results)} Treffer")
local_best: MatchResult | None = None
local_score = 0.0
for r in results:
score = _score_result(r, title, author)
logger.info(f"{r.title!r} ({r.author!r}) score={score:.2f}")
if score > local_score:
local_score = score
local_best = r
if local_best and local_score > best_score:
best_score = local_score
best = local_best
if details_func and local_score >= UNCERTAIN_THRESHOLD:
try:
details = await details_func(local_best.source_id)
if details:
_enrich_match(best, details)
logger.info(f"{source_name}: Details geladen für {local_best.source_id}")
except Exception as e:
logger.warning(f"{source_name} Details Fehler: {e}")
except Exception as e:
logger.warning(f"{source_name} Fehler: {e}")
if best and best_score >= UNCERTAIN_THRESHOLD:
try:
await _apply_match(db, item, best, best_score)
await db.commit()
logger.info(f"Match angewendet: '{item.title}'{best.source} ({best_score:.2f})")
except Exception as e:
logger.error(f"_apply_match fehlgeschlagen für '{title}': {e}", exc_info=True)
else:
logger.info(f"Kein Match für '{title}' (beste Konfidenz: {best_score:.2f}, Schwelle: {UNCERTAIN_THRESHOLD})")
await db.commit()
async def search_for_item(title: str, author: str | None = None) -> list[dict]:
"""Suche über alle Quellen für manuelles Matching. Gibt alle relevanten Felder zurück."""
detected_series, episode = detect_series(title)
if detected_series and episode:
search_title = f"{detected_series} {episode}"
elif detected_series:
cleaned = _build_search_title(title)
search_title = _strip_series_prefix(cleaned, detected_series)
else:
search_title = _build_search_title(title)
logger.info(
f"Manuelle Suche: orig='{title}' bereinigt='{search_title}' "
f"author={author!r} series={detected_series!r}"
)
async def _search_source(name: str, coro):
try:
r = await coro
logger.info(f"Manuelle Suche {name}: {len(r)} Treffer")
return r
except Exception as e:
logger.warning(f"Manuelle Suche {name} Fehler: {e}")
return []
mb, ol, gb, dnb = await asyncio.gather(
_search_source("musicbrainz", search_musicbrainz(search_title, author)),
_search_source("open_library", search_open_library(search_title, author)),
_search_source("google_books", search_google_books(search_title, author)),
_search_source("dnb", search_dnb(search_title, author)),
)
results = []
for r in mb + ol + gb + dnb:
results.append({
"source": r.source,
"id": r.source_id,
"title": r.title,
"subtitle": r.subtitle,
"author": r.author,
"narrator": r.narrator,
"description": r.description,
"publisher": r.publisher,
"publishYear": r.publish_year,
"series": r.series,
"seriesSequence": r.series_sequence,
"language": r.language,
"genres": r.genres,
"cover": r.cover_url,
"confidence": r.confidence,
})
results.sort(key=lambda x: x["confidence"], reverse=True)
logger.info(f"Manuelle Suche '{title}' (author={author!r}): {len(results)} Treffer total")
return results