import httpx import asyncio from .base import MatchResult MB_BASE = "https://musicbrainz.org/ws/2" CAA_BASE = "https://coverartarchive.org" HEADERS = {"User-Agent": "audiolib/1.0 (contact@audiolib.local)"} _semaphore = asyncio.Semaphore(2) # MusicBrainz Rate-Limit: max 1 req/s async def _get(client: httpx.AsyncClient, url: str, **params) -> dict: async with _semaphore: await asyncio.sleep(1.1) # MusicBrainz erlaubt 1 req/s r = await client.get(url, params={"fmt": "json", **params}, timeout=15) r.raise_for_status() return r.json() async def search_musicbrainz(title: str, artist: str | None = None) -> list[MatchResult]: query = f'release:"{title}"' if artist: query += f' AND artist:"{artist}"' async with httpx.AsyncClient(headers=HEADERS) as client: try: data = await _get(client, f"{MB_BASE}/release", query=query, limit=5) except Exception: return [] results = [] for rel in data.get("releases", []): confidence = rel.get("score", 0) / 100.0 artist_name = _first_artist(rel) release_id = rel.get("id", "") results.append(MatchResult( source="musicbrainz", source_id=release_id, title=rel.get("title", title), author=artist_name, publish_year=_parse_year(rel.get("date", "")), confidence=confidence, )) return results async def get_release_details(release_id: str) -> MatchResult | None: """Lädt vollständige Release-Details inkl. Tracklist (= Kapitel) und Cover.""" async with httpx.AsyncClient(headers=HEADERS) as client: try: data = await _get( client, f"{MB_BASE}/release/{release_id}", inc="recordings+artists+release-groups" ) except Exception: return None artist_name = _first_artist(data) rg = data.get("release-group", {}) series = rg.get("title") if rg.get("primary-type") == "Album" else None # Tracklist → Kapitel chapters = [] offset = 0.0 for medium in data.get("media", []): for track in medium.get("tracks", []): duration_ms = track.get("length") or track.get("recording", {}).get("length") or 0 duration_s = duration_ms / 1000.0 chapters.append({ "title": track.get("title", f"Track {track.get('position', '')}"), "start": offset, "end": offset + duration_s, }) offset += duration_s # Cover Art cover_url = None try: caa = await client.get(f"{CAA_BASE}/release/{release_id}", timeout=10) if caa.status_code == 200: caa_data = caa.json() images = caa_data.get("images", []) front = next((i for i in images if i.get("front")), images[0] if images else None) if front: cover_url = front.get("thumbnails", {}).get("large") or front.get("image") except Exception: pass return MatchResult( source="musicbrainz", source_id=release_id, title=data.get("title", ""), author=artist_name, publish_year=_parse_year(data.get("date", "")), cover_url=cover_url, chapters=chapters, confidence=1.0, ) def _first_artist(release: dict) -> str | None: credits = release.get("artist-credit", []) if credits: c = credits[0] return c.get("name") or c.get("artist", {}).get("name") return None def _parse_year(date_str: str) -> int | None: if date_str and len(date_str) >= 4: try: return int(date_str[:4]) except ValueError: pass return None