import os import asyncio import shutil from typing import Optional from ..config import get_settings HLS_SEGMENT_DURATION = 10 _running_sessions: dict[str, asyncio.Task] = {} async def _run_ffmpeg(session_id: str, audio_files: list[str], start_time: float = 0.0): settings = get_settings() session_dir = os.path.join(settings.hls_cache_dir, session_id) os.makedirs(session_dir, exist_ok=True) playlist_path = os.path.join(session_dir, "output.m3u8") if len(audio_files) == 1: input_args = ["-ss", str(start_time), "-i", audio_files[0]] else: concat_file = os.path.join(session_dir, "concat.txt") with open(concat_file, "w", encoding="utf-8") as f: for af in audio_files: f.write(f"file '{af.replace(chr(92), '/')}'\n") input_args = ["-f", "concat", "-safe", "0", "-i", concat_file, "-ss", str(start_time)] cmd = [ "ffmpeg", "-y", *input_args, "-c:a", "aac", "-b:a", "128k", "-ac", "2", "-hls_time", str(HLS_SEGMENT_DURATION), "-hls_list_size", "0", "-hls_segment_filename", os.path.join(session_dir, "seg%05d.ts"), "-hls_flags", "independent_segments", playlist_path, ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0 and session_id in _running_sessions: err = stderr.decode(errors="replace") if stderr else "unknown" # Fehler-Datei schreiben damit der Client es merkt with open(os.path.join(session_dir, "error.txt"), "w") as f: f.write(err) def start_hls_session(session_id: str, audio_files: list[str], start_time: float = 0.0) -> str: """Startet FFmpeg als Background-Task. Gibt den Session-Pfad zurück.""" settings = get_settings() session_dir = os.path.join(settings.hls_cache_dir, session_id) os.makedirs(session_dir, exist_ok=True) task = asyncio.create_task(_run_ffmpeg(session_id, audio_files, start_time)) _running_sessions[session_id] = task return session_dir async def wait_for_playlist(session_id: str, timeout: float = 60.0) -> bool: """Wartet bis das erste Segment fertig ist (max. timeout Sekunden).""" settings = get_settings() playlist = os.path.join(settings.hls_cache_dir, session_id, "output.m3u8") error_file = os.path.join(settings.hls_cache_dir, session_id, "error.txt") waited = 0.0 while waited < timeout: if os.path.exists(error_file): return False if os.path.exists(playlist) and os.path.getsize(playlist) > 0: # Warte auf mindestens 1 Segment seg0 = os.path.join(settings.hls_cache_dir, session_id, "seg00000.ts") if os.path.exists(seg0): return True await asyncio.sleep(0.5) waited += 0.5 return False def cleanup_hls_session(session_id: str): settings = get_settings() session_dir = os.path.join(settings.hls_cache_dir, session_id) task = _running_sessions.pop(session_id, None) if task and not task.done(): task.cancel() if os.path.exists(session_dir): shutil.rmtree(session_dir, ignore_errors=True) def get_hls_session_path(session_id: str) -> Optional[str]: settings = get_settings() session_dir = os.path.join(settings.hls_cache_dir, session_id) return session_dir if os.path.isdir(session_dir) else None