Files
Audiolib/backend/app/routers/libraries.py
Audiolib 18d1481681 Fix sidebar, add library editing, improve file browser
- Sidebar: show Podcasts section only when podcast libraries exist;
  show hint text when no libraries are configured yet
- Admin: extract LibraryForm component, add edit (Pencil) button per
  library with inline form and PATCH support
- Backend: add media_type to LibraryUpdate schema and PATCH handler
- FileBrowser: add quick-access shortcuts (/audiofiles /data /media /),
  show all files+dirs so users can confirm correct folder

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 14:42:35 +02:00

221 lines
6.8 KiB
Python

import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from ..dependencies import get_db, get_current_user, require_admin
from ..models.user import User
from ..models.library import Library
from ..models.media_item import LibraryItem
from ..models.session import ScanJob
from ..schemas.library import LibraryOut, LibraryFolder, LibrarySettings, LibraryCreate, LibraryUpdate, LibraryItemsResponse
from ..config import get_settings
router = APIRouter(prefix="/api/libraries", tags=["libraries"])
def _library_to_out(lib: Library) -> dict:
settings_data = lib.settings or {}
folders = [
LibraryFolder(
id=f.get("id", str(uuid.uuid4())),
full_path=f.get("fullPath", f.get("full_path", "")),
library_id=lib.id,
added_at=int(lib.created_at.timestamp() * 1000) if lib.created_at else 0,
)
for f in (lib.folders or [])
]
out = LibraryOut(
id=lib.id,
name=lib.name,
folders=folders,
media_type=lib.media_type,
icon=settings_data.get("icon", "database"),
provider=settings_data.get("provider", "google"),
created_at=int(lib.created_at.timestamp() * 1000) if lib.created_at else 0,
last_update=int(lib.updated_at.timestamp() * 1000) if lib.updated_at else 0,
)
return out.model_dump(by_alias=True)
@router.get("")
async def list_libraries(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Library))
libraries = result.scalars().all()
return {"libraries": [_library_to_out(lib) for lib in libraries]}
@router.get("/{library_id}")
async def get_library(
library_id: str,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Library).where(Library.id == library_id))
lib = result.scalar_one_or_none()
if not lib:
raise HTTPException(status_code=404, detail="Library not found")
return _library_to_out(lib)
@router.get("/{library_id}/items")
async def get_library_items(
library_id: str,
sort: str = "addedAt",
desc: int = 0,
filter: str | None = None,
search: str | None = None,
page: int = 0,
limit: int = 0,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Library).where(Library.id == library_id))
lib = result.scalar_one_or_none()
if not lib:
raise HTTPException(status_code=404, detail="Library not found")
query = select(LibraryItem).where(LibraryItem.library_id == library_id)
if search:
query = query.where(
LibraryItem.title.ilike(f"%{search}%") |
LibraryItem.author.ilike(f"%{search}%") |
LibraryItem.series.ilike(f"%{search}%")
)
count_result = await db.execute(select(func.count()).select_from(query.subquery()))
total = count_result.scalar()
actual_limit = limit if limit > 0 else 50
query = query.offset(page * actual_limit).limit(actual_limit)
items_result = await db.execute(query)
items = items_result.scalars().all()
from ..routers.items import _item_to_out
return {
"results": [_item_to_out(item) for item in items],
"total": total,
"limit": actual_limit,
"page": page,
}
@router.get("/{library_id}/search")
async def search_library(
library_id: str,
q: str = "",
limit: int = 12,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
query = select(LibraryItem).where(
LibraryItem.library_id == library_id,
LibraryItem.title.ilike(f"%{q}%") |
LibraryItem.author.ilike(f"%{q}%") |
LibraryItem.series.ilike(f"%{q}%")
).limit(limit)
result = await db.execute(query)
items = result.scalars().all()
from ..routers.items import _item_to_out
return {"book": [_item_to_out(item) for item in items]}
@router.post("")
async def create_library(
body: LibraryCreate,
admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
lib_id = str(uuid.uuid4())
folders = [
{"id": str(uuid.uuid4()), "fullPath": f.get("fullPath", f.get("full_path", ""))}
for f in body.folders
]
lib = Library(
id=lib_id,
name=body.name,
display_name=body.name,
folders=folders,
media_type=body.media_type,
settings={"icon": body.icon, "provider": body.provider},
)
db.add(lib)
await db.commit()
await db.refresh(lib)
return _library_to_out(lib)
@router.patch("/{library_id}")
async def update_library(
library_id: str,
body: LibraryUpdate,
admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Library).where(Library.id == library_id))
lib = result.scalar_one_or_none()
if not lib:
raise HTTPException(status_code=404, detail="Library not found")
if body.name is not None:
lib.name = body.name
lib.display_name = body.name
if body.folders is not None:
lib.folders = [
{"id": f.get("id", str(uuid.uuid4())), "fullPath": f.get("fullPath", f.get("full_path", ""))}
for f in body.folders
]
if body.media_type is not None:
lib.media_type = body.media_type
if body.settings is not None:
lib.settings = {**(lib.settings or {}), **body.settings}
await db.commit()
await db.refresh(lib)
return _library_to_out(lib)
@router.delete("/{library_id}")
async def delete_library(
library_id: str,
admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Library).where(Library.id == library_id))
lib = result.scalar_one_or_none()
if not lib:
raise HTTPException(status_code=404, detail="Library not found")
await db.delete(lib)
await db.commit()
return {"success": True}
@router.post("/{library_id}/scan")
async def scan_library(
library_id: str,
background_tasks: BackgroundTasks,
admin: User = Depends(require_admin),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Library).where(Library.id == library_id))
lib = result.scalar_one_or_none()
if not lib:
raise HTTPException(status_code=404, detail="Library not found")
job = ScanJob(library_id=library_id, status="queued")
db.add(job)
await db.commit()
await db.refresh(job)
from ..services.scanner import scan_library_task
background_tasks.add_task(scan_library_task, library_id, job.id)
return {"id": job.id, "type": "scan", "libraryId": library_id}