Files
Audiolib/backend/app/services/scanner.py
Audiolib 0824894a7f Read ID3 tags during scan — fixes 'Folge 114 Die Villa der Toten' problem
Diagnosis from connectivity check: 4/5 APIs reachable (only Google Books
rate-limited). So the network is fine — the search title was the problem.
'Folge 114 Die Villa der Toten' isn't indexed under that name anywhere.
The MP3 itself has the real metadata in ID3 tags (album, artist, year).

Scanner now reads ID3/Vorbis/MP4 tags from the first audio file:
- album → item.title
- albumartist / composer / artist → item.author
- date → publish_year
- organization / publisher → publisher
- language → language
- genre → genres
- artist (heuristic) → series, if it doesn't appear in album title

Parent folder name → series hint (skipped if it's a library root).

Only fills empty fields, never overwrites manually edited or matched data.
Runs on new items AND on re-scan for items without an active match.

Search title normalization improved: 'Folge 123 - X' / 'Band 7: Y' etc.
prefixes and infixes get stripped so APIs see the actual episode title.

New endpoint POST /api/items/{id}/extract-tags + 'Tags lesen' button in
BookDetail — triggers tag extraction on demand for existing items.
Returns before/after diff so user can see what was filled in.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 20:15:44 +02:00

425 lines
16 KiB
Python

import os
import uuid
import asyncio
import logging
import shutil
from datetime import datetime
from pathlib import Path
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..database import AsyncSessionLocal
from ..config import get_settings
from ..models.library import Library
from ..models.media_item import LibraryItem, BookFile, Chapter
from ..models.session import ScanJob
logger = logging.getLogger(__name__)
AUDIO_EXTENSIONS = {".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".m4b", ".opus"}
COVER_NAMES = ["cover", "folder", "front", "album", "albumart", "Cover", "Folder", "Front"]
COVER_EXTS = [".jpg", ".jpeg", ".png", ".webp"]
def _get_audio_duration(file_path: str) -> float:
try:
from mutagen import File as MutagenFile
audio = MutagenFile(file_path)
if audio and audio.info:
return float(audio.info.length)
except Exception:
pass
return 0.0
def _extract_audio_tags(file_path: str) -> dict:
"""Liest ID3/Vorbis/MP4-Tags via mutagen easy-API."""
try:
from mutagen import File as MutagenFile
audio = MutagenFile(file_path, easy=True)
if not audio:
return {}
def first(key: str):
v = audio.get(key)
if not v:
return None
if isinstance(v, list):
return v[0] if v else None
return v
result = {
"album": first("album"),
"title": first("title"),
"artist": first("artist"),
"albumartist": first("albumartist"),
"composer": first("composer"),
"date": first("date"),
"publisher": first("organization") or first("publisher"),
"language": first("language"),
"discnumber": first("discnumber"),
"tracknumber": first("tracknumber"),
}
genre = audio.get("genre")
if genre:
result["genres"] = genre if isinstance(genre, list) else [genre]
return {k: v for k, v in result.items() if v}
except Exception as e:
logger.debug(f"Tag-Lesen fehlgeschlagen für {file_path}: {e}")
return {}
def _series_from_parent(folder_path: str, library_folders: list) -> str | None:
"""Wenn der Parent-Ordner nicht selbst eine Library-Root ist, ist er möglicherweise die Serie."""
import re as _re
parent_path = os.path.dirname(folder_path)
parent = os.path.basename(parent_path)
if not parent:
return None
# Skip wenn Parent eine Library-Root ist
for lib_folder in library_folders:
lib_path = lib_folder.get("fullPath", lib_folder.get("full_path", ""))
if lib_path and os.path.normpath(parent_path) == os.path.normpath(lib_path):
return None
if 2 < len(parent) < 60 and not _re.match(r"^[\d\W]+$", parent):
return parent
return None
def _apply_tags_to_item(item, tags: dict, parent_series_hint: str | None):
"""Befüllt leere Felder aus ID3-Tags. Bestehende Werte werden NICHT überschrieben."""
import re as _re
album = tags.get("album")
artist = tags.get("albumartist") or tags.get("artist")
composer = tags.get("composer")
# Title: Album ist normalerweise der Hörbuch-Titel
folder_title = _guess_title_from_path(item.path)
if album and (not item.title or item.title == folder_title):
item.title = album
# Author: AlbumArtist > Composer > Artist
if not item.author:
if composer:
item.author = composer
elif artist:
item.author = artist
if not item.publisher and tags.get("publisher"):
item.publisher = tags["publisher"]
if not item.publish_year and tags.get("date"):
m = _re.search(r"\d{4}", str(tags["date"]))
if m:
item.publish_year = int(m.group())
if not item.language and tags.get("language"):
item.language = tags["language"]
if not item.genres and tags.get("genres"):
item.genres = tags["genres"]
# Serie aus tracknumber/discnumber wäre möglich aber unzuverlässig.
# Stattdessen: Parent-Ordner als Serien-Hinweis nehmen.
if not item.series and parent_series_hint:
item.series = parent_series_hint
# Bei "Die drei ???" Hörspielen: artist ist meist die Serie selbst
if not item.series and artist and len(artist) < 40:
# Heuristik: wenn artist und album sich nicht ähneln, könnte artist die Serie sein
if album and artist.lower() not in album.lower():
item.series = artist
def _get_file_size(file_path: str) -> int:
try:
return os.path.getsize(file_path)
except Exception:
return 0
def _find_folder_cover(folder: str) -> str | None:
"""Sucht cover.jpg / folder.jpg / front.jpg etc. im Ordner."""
try:
for entry in os.listdir(folder):
name, ext = os.path.splitext(entry)
if ext.lower() in COVER_EXTS and name.lower() in [c.lower() for c in COVER_NAMES]:
return os.path.join(folder, entry)
except (PermissionError, FileNotFoundError):
pass
return None
def _extract_embedded_cover(file_path: str) -> tuple[bytes, str] | None:
"""Extrahiert eingebettetes Cover aus Audio-Datei. Gibt (bytes, ext) zurück."""
try:
from mutagen import File as MutagenFile
from mutagen.id3 import APIC
from mutagen.mp4 import MP4Cover
from mutagen.flac import Picture
audio = MutagenFile(file_path)
if not audio:
return None
# MP3 / ID3 (APIC)
if audio.tags and hasattr(audio.tags, 'getall'):
try:
apics = audio.tags.getall('APIC')
if apics:
apic = apics[0]
ext = '.png' if apic.mime == 'image/png' else '.jpg'
return (apic.data, ext)
except Exception:
pass
# MP4/M4B/M4A (covr atom)
if audio.tags and 'covr' in audio.tags:
covr = audio.tags['covr']
if covr:
cover = covr[0]
ext = '.png' if cover.imageformat == MP4Cover.FORMAT_PNG else '.jpg'
return (bytes(cover), ext)
# FLAC, OGG
if hasattr(audio, 'pictures') and audio.pictures:
pic = audio.pictures[0]
ext = '.png' if 'png' in (pic.mime or '').lower() else '.jpg'
return (pic.data, ext)
except Exception as e:
logger.debug(f"Cover-Extraktion fehlgeschlagen für {file_path}: {e}")
return None
def _save_local_cover(folder_path: str, audio_files: list[str], item_id: str) -> str | None:
"""Findet ein Cover (Ordner-Datei oder Embed) und speichert es lokal."""
settings = get_settings()
covers_dir = settings.covers_dir
os.makedirs(covers_dir, exist_ok=True)
folder_cover = _find_folder_cover(folder_path)
if folder_cover:
ext = os.path.splitext(folder_cover)[1].lower()
if ext not in COVER_EXTS:
ext = ".jpg"
dest = os.path.join(covers_dir, f"{item_id}{ext}")
try:
shutil.copyfile(folder_cover, dest)
logger.info(f"Ordner-Cover übernommen: {folder_cover}{dest}")
return dest
except Exception as e:
logger.warning(f"Cover-Copy fehlgeschlagen: {e}")
for f in audio_files[:1]:
result = _extract_embedded_cover(f)
if result:
data, ext = result
if len(data) > 1000:
dest = os.path.join(covers_dir, f"{item_id}{ext}")
try:
with open(dest, "wb") as fd:
fd.write(data)
logger.info(f"Embedded Cover extrahiert: {f}{dest} ({len(data)} Bytes)")
return dest
except Exception as e:
logger.warning(f"Cover-Save fehlgeschlagen: {e}")
return None
def _guess_title_from_path(folder_path: str) -> str:
"""Leitet Titel aus dem Ordnernamen ab."""
return os.path.basename(folder_path)
def _discover_audiobook_folders(base_path: str) -> list[dict]:
"""
Findet alle Unterordner mit Audio-Dateien.
Jeder Ordner = ein Hörbuch (ABS-Prinzip).
"""
books = []
base = Path(base_path)
if not base.exists():
logger.warning(f"Pfad nicht gefunden: {base_path}")
return books
# Direkte Audio-Dateien im Root → ein "Root"-Buch
root_audio = [f for f in base.iterdir() if f.is_file() and f.suffix.lower() in AUDIO_EXTENSIONS]
if root_audio:
books.append({
"path": str(base),
"files": [str(f) for f in sorted(root_audio)],
})
# Unterordner durchsuchen
for entry in base.iterdir():
if not entry.is_dir():
continue
audio_files = []
_collect_audio_files(entry, audio_files)
if audio_files:
books.append({
"path": str(entry),
"files": sorted(audio_files),
})
return books
def _collect_audio_files(folder: Path, result: list):
"""Rekursiv alle Audio-Dateien sammeln."""
try:
for entry in sorted(folder.iterdir()):
if entry.is_file() and entry.suffix.lower() in AUDIO_EXTENSIONS:
result.append(str(entry))
elif entry.is_dir():
_collect_audio_files(entry, result)
except PermissionError:
pass
async def scan_library_task(library_id: str, job_id: str):
"""Hintergrund-Task: Scannt eine Library und befüllt die DB."""
async with AsyncSessionLocal() as db:
try:
# Job auf "running" setzen
job_result = await db.execute(select(ScanJob).where(ScanJob.id == job_id))
job = job_result.scalar_one_or_none()
if job:
job.status = "running"
job.started_at = datetime.utcnow()
await db.commit()
lib_result = await db.execute(select(Library).where(Library.id == library_id))
lib = lib_result.scalar_one_or_none()
if not lib:
return
folders = lib.folders or []
all_books = []
for folder_info in folders:
folder_path = folder_info.get("fullPath", folder_info.get("full_path", ""))
if folder_path:
all_books.extend(_discover_audiobook_folders(folder_path))
items_found = 0
new_item_ids: list[str] = []
for book_info in all_books:
folder_path = book_info["path"]
audio_files = book_info["files"]
# Existiert schon?
existing = await db.execute(
select(LibraryItem).where(
LibraryItem.library_id == library_id,
LibraryItem.path == folder_path,
)
)
existing_item = existing.scalar_one_or_none()
total_duration = sum(_get_audio_duration(f) for f in audio_files)
total_size = sum(_get_file_size(f) for f in audio_files)
# ID3-Tags aus erster Audio-Datei lesen
first_audio = audio_files[0] if audio_files else None
tags = _extract_audio_tags(first_audio) if first_audio else {}
parent_series = _series_from_parent(folder_path, folders)
if existing_item:
existing_item.duration_seconds = total_duration
existing_item.size_bytes = total_size
existing_item.num_files = len(audio_files)
existing_item.is_missing = False
existing_item.updated_at = datetime.utcnow()
item = existing_item
# Tags nachziehen wenn kein Match aktiv ist
if not existing_item.match_locked and (
not existing_item.matched_source or existing_item.matched_source == "none"
):
_apply_tags_to_item(item, tags, parent_series)
# Cover aus Ordner/Embed nachziehen falls noch keins da ist
if not item.cover_path or not os.path.exists(item.cover_path or ""):
local_cover = _save_local_cover(folder_path, audio_files, item.id)
if local_cover:
item.cover_path = local_cover
else:
item_id = str(uuid.uuid4())
title = _guess_title_from_path(folder_path)
local_cover = _save_local_cover(folder_path, audio_files, item_id)
item = LibraryItem(
id=item_id,
library_id=library_id,
media_type=lib.media_type,
path=folder_path,
ino=str(os.stat(folder_path).st_ino) if os.path.exists(folder_path) else "",
title=title,
duration_seconds=total_duration,
size_bytes=total_size,
num_files=len(audio_files),
cover_path=local_cover,
tags=["zu_prüfen"],
)
db.add(item)
await db.flush()
# Tags anwenden
_apply_tags_to_item(item, tags, parent_series)
logger.info(
f"Neu gescannt: id={item.id} title={item.title!r} "
f"author={item.author!r} series={item.series!r} "
f"year={item.publish_year} tags={list(tags.keys())}"
)
# BookFiles anlegen
for idx, file_path in enumerate(audio_files):
bf = BookFile(
library_item_id=item.id,
filename=os.path.basename(file_path),
path=file_path,
format=Path(file_path).suffix.lstrip(".").lower(),
size_bytes=_get_file_size(file_path),
duration_seconds=_get_audio_duration(file_path),
track_index=idx,
)
db.add(bf)
new_item_ids.append(item.id)
items_found += 1
await db.commit()
# Fehlende Items markieren
all_items_result = await db.execute(
select(LibraryItem).where(LibraryItem.library_id == library_id)
)
all_items = all_items_result.scalars().all()
found_paths = {b["path"] for b in all_books}
for item in all_items:
item.is_missing = item.path not in found_paths
await db.commit()
if job:
job.status = "done"
job.items_found = items_found
job.finished_at = datetime.utcnow()
job.progress = 1.0
await db.commit()
logger.info(f"Scan abgeschlossen: {items_found} Items in Library {library_id}")
# Auto-Matching für neue Items starten
if new_item_ids:
from .matcher import match_audiobook
for iid in new_item_ids:
asyncio.create_task(match_audiobook(iid))
except Exception as e:
logger.error(f"Scan-Fehler für Library {library_id}: {e}", exc_info=True)
async with AsyncSessionLocal() as err_db:
job_result = await err_db.execute(select(ScanJob).where(ScanJob.id == job_id))
job = job_result.scalar_one_or_none()
if job:
job.status = "error"
job.log = str(e)
job.finished_at = datetime.utcnow()
await err_db.commit()