import os import uuid import asyncio import logging import shutil from datetime import datetime from pathlib import Path from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from ..database import AsyncSessionLocal from ..config import get_settings from ..models.library import Library from ..models.media_item import LibraryItem, BookFile, Chapter from ..models.session import ScanJob logger = logging.getLogger(__name__) AUDIO_EXTENSIONS = {".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".m4b", ".opus"} COVER_NAMES = ["cover", "folder", "front", "album", "albumart", "Cover", "Folder", "Front"] COVER_EXTS = [".jpg", ".jpeg", ".png", ".webp"] def _get_audio_duration(file_path: str) -> float: try: from mutagen import File as MutagenFile audio = MutagenFile(file_path) if audio and audio.info: return float(audio.info.length) except Exception: pass return 0.0 def _extract_audio_tags(file_path: str) -> dict: """Liest ID3/Vorbis/MP4-Tags via mutagen easy-API.""" try: from mutagen import File as MutagenFile audio = MutagenFile(file_path, easy=True) if not audio: return {} def first(key: str): v = audio.get(key) if not v: return None if isinstance(v, list): return v[0] if v else None return v result = { "album": first("album"), "title": first("title"), "artist": first("artist"), "albumartist": first("albumartist"), "composer": first("composer"), "date": first("date"), "publisher": first("organization") or first("publisher"), "language": first("language"), "discnumber": first("discnumber"), "tracknumber": first("tracknumber"), } genre = audio.get("genre") if genre: result["genres"] = genre if isinstance(genre, list) else [genre] return {k: v for k, v in result.items() if v} except Exception as e: logger.debug(f"Tag-Lesen fehlgeschlagen für {file_path}: {e}") return {} def _series_from_parent(folder_path: str, library_folders: list) -> str | None: """Wenn der Parent-Ordner nicht selbst eine Library-Root ist, ist er möglicherweise die Serie.""" import re as _re parent_path = os.path.dirname(folder_path) parent = os.path.basename(parent_path) if not parent: return None # Skip wenn Parent eine Library-Root ist for lib_folder in library_folders: lib_path = lib_folder.get("fullPath", lib_folder.get("full_path", "")) if lib_path and os.path.normpath(parent_path) == os.path.normpath(lib_path): return None if 2 < len(parent) < 60 and not _re.match(r"^[\d\W]+$", parent): return parent return None def _apply_tags_to_item(item, tags: dict, parent_series_hint: str | None): """Befüllt leere Felder aus ID3-Tags. Bestehende Werte werden NICHT überschrieben.""" import re as _re album = tags.get("album") artist = tags.get("albumartist") or tags.get("artist") composer = tags.get("composer") # Title: Album ist normalerweise der Hörbuch-Titel folder_title = _guess_title_from_path(item.path) if album and (not item.title or item.title == folder_title): item.title = album # Author: AlbumArtist > Composer > Artist if not item.author: if composer: item.author = composer elif artist: item.author = artist if not item.publisher and tags.get("publisher"): item.publisher = tags["publisher"] if not item.publish_year and tags.get("date"): m = _re.search(r"\d{4}", str(tags["date"])) if m: item.publish_year = int(m.group()) if not item.language and tags.get("language"): item.language = tags["language"] if not item.genres and tags.get("genres"): item.genres = tags["genres"] # Serie aus tracknumber/discnumber wäre möglich aber unzuverlässig. # Stattdessen: Parent-Ordner als Serien-Hinweis nehmen. if not item.series and parent_series_hint: item.series = parent_series_hint # Bei "Die drei ???" Hörspielen: artist ist meist die Serie selbst if not item.series and artist and len(artist) < 40: # Heuristik: wenn artist und album sich nicht ähneln, könnte artist die Serie sein if album and artist.lower() not in album.lower(): item.series = artist def _get_file_size(file_path: str) -> int: try: return os.path.getsize(file_path) except Exception: return 0 def _find_folder_cover(folder: str) -> str | None: """Sucht cover.jpg / folder.jpg / front.jpg etc. im Ordner.""" try: for entry in os.listdir(folder): name, ext = os.path.splitext(entry) if ext.lower() in COVER_EXTS and name.lower() in [c.lower() for c in COVER_NAMES]: return os.path.join(folder, entry) except (PermissionError, FileNotFoundError): pass return None def _extract_embedded_cover(file_path: str) -> tuple[bytes, str] | None: """Extrahiert eingebettetes Cover aus Audio-Datei. Gibt (bytes, ext) zurück.""" try: from mutagen import File as MutagenFile from mutagen.id3 import APIC from mutagen.mp4 import MP4Cover from mutagen.flac import Picture audio = MutagenFile(file_path) if not audio: return None # MP3 / ID3 (APIC) if audio.tags and hasattr(audio.tags, 'getall'): try: apics = audio.tags.getall('APIC') if apics: apic = apics[0] ext = '.png' if apic.mime == 'image/png' else '.jpg' return (apic.data, ext) except Exception: pass # MP4/M4B/M4A (covr atom) if audio.tags and 'covr' in audio.tags: covr = audio.tags['covr'] if covr: cover = covr[0] ext = '.png' if cover.imageformat == MP4Cover.FORMAT_PNG else '.jpg' return (bytes(cover), ext) # FLAC, OGG if hasattr(audio, 'pictures') and audio.pictures: pic = audio.pictures[0] ext = '.png' if 'png' in (pic.mime or '').lower() else '.jpg' return (pic.data, ext) except Exception as e: logger.debug(f"Cover-Extraktion fehlgeschlagen für {file_path}: {e}") return None def _save_local_cover(folder_path: str, audio_files: list[str], item_id: str) -> str | None: """Findet ein Cover (Ordner-Datei oder Embed) und speichert es lokal.""" settings = get_settings() covers_dir = settings.covers_dir os.makedirs(covers_dir, exist_ok=True) folder_cover = _find_folder_cover(folder_path) if folder_cover: ext = os.path.splitext(folder_cover)[1].lower() if ext not in COVER_EXTS: ext = ".jpg" dest = os.path.join(covers_dir, f"{item_id}{ext}") try: shutil.copyfile(folder_cover, dest) logger.info(f"Ordner-Cover übernommen: {folder_cover} → {dest}") return dest except Exception as e: logger.warning(f"Cover-Copy fehlgeschlagen: {e}") for f in audio_files[:1]: result = _extract_embedded_cover(f) if result: data, ext = result if len(data) > 1000: dest = os.path.join(covers_dir, f"{item_id}{ext}") try: with open(dest, "wb") as fd: fd.write(data) logger.info(f"Embedded Cover extrahiert: {f} → {dest} ({len(data)} Bytes)") return dest except Exception as e: logger.warning(f"Cover-Save fehlgeschlagen: {e}") return None def _guess_title_from_path(folder_path: str) -> str: """Leitet Titel aus dem Ordnernamen ab.""" return os.path.basename(folder_path) def _discover_audiobook_folders(base_path: str) -> list[dict]: """ Findet alle Unterordner mit Audio-Dateien. Jeder Ordner = ein Hörbuch (ABS-Prinzip). """ books = [] base = Path(base_path) if not base.exists(): logger.warning(f"Pfad nicht gefunden: {base_path}") return books # Direkte Audio-Dateien im Root → ein "Root"-Buch root_audio = [f for f in base.iterdir() if f.is_file() and f.suffix.lower() in AUDIO_EXTENSIONS] if root_audio: books.append({ "path": str(base), "files": [str(f) for f in sorted(root_audio)], }) # Unterordner durchsuchen for entry in base.iterdir(): if not entry.is_dir(): continue audio_files = [] _collect_audio_files(entry, audio_files) if audio_files: books.append({ "path": str(entry), "files": sorted(audio_files), }) return books def _collect_audio_files(folder: Path, result: list): """Rekursiv alle Audio-Dateien sammeln.""" try: for entry in sorted(folder.iterdir()): if entry.is_file() and entry.suffix.lower() in AUDIO_EXTENSIONS: result.append(str(entry)) elif entry.is_dir(): _collect_audio_files(entry, result) except PermissionError: pass async def scan_library_task(library_id: str, job_id: str): """Hintergrund-Task: Scannt eine Library und befüllt die DB.""" async with AsyncSessionLocal() as db: try: # Job auf "running" setzen job_result = await db.execute(select(ScanJob).where(ScanJob.id == job_id)) job = job_result.scalar_one_or_none() if job: job.status = "running" job.started_at = datetime.utcnow() await db.commit() lib_result = await db.execute(select(Library).where(Library.id == library_id)) lib = lib_result.scalar_one_or_none() if not lib: return folders = lib.folders or [] all_books = [] for folder_info in folders: folder_path = folder_info.get("fullPath", folder_info.get("full_path", "")) if folder_path: all_books.extend(_discover_audiobook_folders(folder_path)) items_found = 0 new_item_ids: list[str] = [] for book_info in all_books: folder_path = book_info["path"] audio_files = book_info["files"] # Existiert schon? existing = await db.execute( select(LibraryItem).where( LibraryItem.library_id == library_id, LibraryItem.path == folder_path, ) ) existing_item = existing.scalar_one_or_none() total_duration = sum(_get_audio_duration(f) for f in audio_files) total_size = sum(_get_file_size(f) for f in audio_files) # ID3-Tags aus erster Audio-Datei lesen first_audio = audio_files[0] if audio_files else None tags = _extract_audio_tags(first_audio) if first_audio else {} parent_series = _series_from_parent(folder_path, folders) if existing_item: existing_item.duration_seconds = total_duration existing_item.size_bytes = total_size existing_item.num_files = len(audio_files) existing_item.is_missing = False existing_item.updated_at = datetime.utcnow() item = existing_item # Tags nachziehen wenn kein Match aktiv ist if not existing_item.match_locked and ( not existing_item.matched_source or existing_item.matched_source == "none" ): _apply_tags_to_item(item, tags, parent_series) # Cover aus Ordner/Embed nachziehen falls noch keins da ist if not item.cover_path or not os.path.exists(item.cover_path or ""): local_cover = _save_local_cover(folder_path, audio_files, item.id) if local_cover: item.cover_path = local_cover else: item_id = str(uuid.uuid4()) title = _guess_title_from_path(folder_path) local_cover = _save_local_cover(folder_path, audio_files, item_id) item = LibraryItem( id=item_id, library_id=library_id, media_type=lib.media_type, path=folder_path, ino=str(os.stat(folder_path).st_ino) if os.path.exists(folder_path) else "", title=title, duration_seconds=total_duration, size_bytes=total_size, num_files=len(audio_files), cover_path=local_cover, tags=["zu_prüfen"], ) db.add(item) await db.flush() # Tags anwenden _apply_tags_to_item(item, tags, parent_series) logger.info( f"Neu gescannt: id={item.id} title={item.title!r} " f"author={item.author!r} series={item.series!r} " f"year={item.publish_year} tags={list(tags.keys())}" ) # BookFiles anlegen for idx, file_path in enumerate(audio_files): bf = BookFile( library_item_id=item.id, filename=os.path.basename(file_path), path=file_path, format=Path(file_path).suffix.lstrip(".").lower(), size_bytes=_get_file_size(file_path), duration_seconds=_get_audio_duration(file_path), track_index=idx, ) db.add(bf) new_item_ids.append(item.id) items_found += 1 await db.commit() # Fehlende Items markieren all_items_result = await db.execute( select(LibraryItem).where(LibraryItem.library_id == library_id) ) all_items = all_items_result.scalars().all() found_paths = {b["path"] for b in all_books} for item in all_items: item.is_missing = item.path not in found_paths await db.commit() if job: job.status = "done" job.items_found = items_found job.finished_at = datetime.utcnow() job.progress = 1.0 await db.commit() logger.info(f"Scan abgeschlossen: {items_found} Items in Library {library_id}") # Auto-Matching für neue Items starten if new_item_ids: from .matcher import match_audiobook for iid in new_item_ids: asyncio.create_task(match_audiobook(iid)) except Exception as e: logger.error(f"Scan-Fehler für Library {library_id}: {e}", exc_info=True) async with AsyncSessionLocal() as err_db: job_result = await err_db.execute(select(ScanJob).where(ScanJob.id == job_id)) job = job_result.scalar_one_or_none() if job: job.status = "error" job.log = str(e) job.finished_at = datetime.utcnow() await err_db.commit()