diff --git a/backend/local_events_util.py b/backend/local_events_util.py index 4dcd7ce..1028f2c 100644 --- a/backend/local_events_util.py +++ b/backend/local_events_util.py @@ -44,6 +44,53 @@ def resolve_creator(ev: models.LocalEvent, *, name_cache: Optional[dict] = None) return None +def private_visibility_for(db: Session, user_id: int) -> str: + """A user's chosen visibility for their private events ('hidden' | 'busy').""" + s = db.query(models.UserSettings).filter(models.UserSettings.user_id == user_id).first() + return (s.private_event_visibility if s else None) or "busy" + + +# Only these fields survive 'busy' anonymisation — a whitelist, so no content +# field (title/location/description/creator/calendar name/recurrence) can leak. +_BUSY_KEEP = { + "id", "url", "start", "end", "allDay", "calendar_id", "calendarColor", + "source", "type", "owner", "is_group_event", "display_color", +} + + +def mask_busy_event(event: dict) -> dict: + """Anonymise a private event for 'busy' visibility: keep only timing / + identity / render fields, drop ALL content.""" + masked = {k: event[k] for k in _BUSY_KEEP if k in event} + masked["title"] = "Beschäftigt" + masked["location"] = "" + masked["description"] = "" + masked["calendar_name"] = "" + masked["creator"] = None + masked["color"] = None + masked["rrule"] = None + masked["exdate"] = None + masked["private"] = True + return masked + + +def apply_event_privacy( + event: dict, *, owner_id, is_private: bool, requester_id: int, visibility: str +) -> Optional[dict]: + """Enforce another user's private-event visibility on a built event dict. + + Returns the event unchanged for the requester's own events or non-private + events, ``None`` when the owner chose 'hidden', or a busy-masked copy when + the owner chose 'busy'. Used by BOTH the merge read and the combined view so + the privacy rule can never drift between them. + """ + if not is_private or owner_id == requester_id: + return event + if visibility == "hidden": + return None + return mask_busy_event(event) + + def build_local_event_dict( ev: models.LocalEvent, cal: models.LocalCalendar, diff --git a/backend/routers/caldav_router.py b/backend/routers/caldav_router.py index d4501ae..d93fc38 100644 --- a/backend/routers/caldav_router.py +++ b/backend/routers/caldav_router.py @@ -14,7 +14,13 @@ import models import permissions from auth import get_current_user from database import get_db -from local_events_util import build_local_event_dict, expand_recurring_local, resolve_creator +from local_events_util import ( + apply_event_privacy, + build_local_event_dict, + expand_recurring_local, + private_visibility_for, + resolve_creator, +) from routers.ical_router import _refresh_if_needed, get_events_for_subscription logger = logging.getLogger(__name__) @@ -335,6 +341,14 @@ def get_events( .all() ) if readable_ids else [] name_cache = {u.id: (u.display_name or u.username) for u in db.query(models.User).all()} + # Cache each owner's private-event visibility (one lookup per owner, not per event). + vis_cache: dict = {} + + def vis_for(uid: int) -> str: + if uid not in vis_cache: + vis_cache[uid] = private_visibility_for(db, uid) + return vis_cache[uid] + for local_cal in local_calendars: local_events = ( db.query(models.LocalEvent) @@ -351,10 +365,27 @@ def get_events( ) for ev in local_events: creator = resolve_creator(ev, name_cache=name_cache) + # A private event belonging to someone else (shared calendar or group + # calendar) must honour that owner's private_event_visibility, exactly + # like the group combined view — otherwise private titles/locations + # leak through the ordinary calendar read. + owner_id = ev.creator_id or local_cal.user_id + is_priv = bool(ev.is_private) + foreign_private = is_priv and owner_id != current_user.id + visibility = vis_for(owner_id) if foreign_private else "busy" + if foreign_private and visibility == "hidden": + continue if ev.rrule: - all_events.extend(expand_recurring_local(ev, local_cal, start_dt, end_dt, creator=creator)) + built = expand_recurring_local(ev, local_cal, start_dt, end_dt, creator=creator) else: - all_events.append(build_local_event_dict(ev, local_cal, rrule=None, creator=creator)) + built = [build_local_event_dict(ev, local_cal, rrule=None, creator=creator)] + for b in built: + b = apply_event_privacy( + b, owner_id=owner_id, is_private=is_priv, + requester_id=current_user.id, visibility=visibility, + ) + if b is not None: + all_events.append(b) # ── iCal subscription events ────────────────────────── ical_subs = ( diff --git a/backend/routers/groups_router.py b/backend/routers/groups_router.py index 593e222..c7c7cdd 100644 --- a/backend/routers/groups_router.py +++ b/backend/routers/groups_router.py @@ -19,7 +19,7 @@ from sqlalchemy.orm import Session import models from auth import get_current_user from database import get_db -from local_events_util import build_local_event_dict, expand_recurring_local +from local_events_util import build_local_event_dict, expand_recurring_local, mask_busy_event logger = logging.getLogger(__name__) router = APIRouter() @@ -309,16 +309,6 @@ def delete_group( return {"ok": True} -def _strip_busy(event: dict) -> dict: - """Anonymise a private event for the 'busy' visibility mode.""" - event = dict(event) - event["title"] = "Beschäftigt" - event["location"] = "" - event["description"] = "" - event["private"] = True - return event - - def _first_name(name: Optional[str]) -> str: if not name: return "" @@ -415,7 +405,7 @@ def combined_events( for b in built: if ev.is_private and creator_owner_id != current_user.id and visibility_for(creator_owner_id) == "busy": - b = _strip_busy(b) + b = mask_busy_event(b) # Colour to render with: the group calendar's colour for group # events, otherwise the owning member's group colour. b["display_color"] = group_cal_color if is_group else member_color.get(owner_id) diff --git a/backend/routers/local_router.py b/backend/routers/local_router.py index a47e70b..b8bb981 100644 --- a/backend/routers/local_router.py +++ b/backend/routers/local_router.py @@ -443,6 +443,17 @@ def _import_ics_into(cal: models.LocalCalendar, raw: bytes, db: Session) -> dict return {"imported": imported, "skipped": skipped, "errors": errors} +# Cap .ics uploads so a huge file can't exhaust memory (read fully into RAM). +MAX_ICS_BYTES = 5 * 1024 * 1024 # 5 MB — generous for calendars + + +async def _read_capped(file: UploadFile) -> bytes: + raw = await file.read(MAX_ICS_BYTES + 1) + if len(raw) > MAX_ICS_BYTES: + raise HTTPException(413, "Datei zu groß (max. 5 MB)") + return raw + + @router.post("/calendars/{calendar_id}/import") async def import_calendar( calendar_id: int, @@ -451,7 +462,7 @@ async def import_calendar( current_user: models.User = Depends(get_current_user), ): cal = permissions.accessible_local_calendar(db, current_user, calendar_id, require_write=True) - raw = await file.read() + raw = await _read_capped(file) try: return _import_ics_into(cal, raw, db) except ValueError as e: @@ -467,10 +478,12 @@ async def import_generic( db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ): + # Read (capped) first so an oversized upload can't leave an empty calendar. + raw = await _read_capped(file) if create_calendar: cal = models.LocalCalendar( user_id=current_user.id, - name=calendar_name or "Importiert", + name=(calendar_name or "Importiert")[:120], ) db.add(cal) db.commit() @@ -480,7 +493,6 @@ async def import_generic( else: raise HTTPException(422, "Provide calendar_id or create_calendar=true") - raw = await file.read() try: result = _import_ics_into(cal, raw, db) except ValueError as e: diff --git a/backend/routers/profile_router.py b/backend/routers/profile_router.py index 257aa67..1f16a37 100644 --- a/backend/routers/profile_router.py +++ b/backend/routers/profile_router.py @@ -1,4 +1,5 @@ import io +import re import base64 from pathlib import Path from typing import Optional @@ -8,7 +9,7 @@ import qrcode from fastapi import APIRouter, Depends, File, HTTPException, UploadFile from fastapi.responses import FileResponse, Response from PIL import Image -from pydantic import BaseModel +from pydantic import BaseModel, Field from sqlalchemy.orm import Session from sqlalchemy import func @@ -27,9 +28,16 @@ ALLOWED_TYPES = {"image/jpeg", "image/png", "image/webp"} # ── Schemas ─────────────────────────────────────────────── class ProfileUpdate(BaseModel): - email: Optional[str] = None - display_name: Optional[str] = None - username: Optional[str] = None # login name (stored lowercase) + # Length caps (SQLite ignores VARCHAR limits, so enforce here). + email: Optional[str] = Field(default=None, max_length=120) + display_name: Optional[str] = Field(default=None, max_length=80) + username: Optional[str] = Field(default=None, max_length=50) # login name (stored lowercase) + + +def _strip_controls(s: str) -> str: + """Remove control characters (defends against injected newlines / NULs that + could be reflected in other clients' calendar/sharing/group views).""" + return re.sub(r"[\x00-\x1f\x7f]", "", s).strip() class PasswordChange(BaseModel): @@ -67,12 +75,26 @@ def update_profile( ): result = {"ok": True} if data.email is not None: - current_user.email = data.email or None + email = _strip_controls(data.email) + if email: + if "@" not in email or "." not in email.split("@")[-1]: + raise HTTPException(422, "Invalid email address") + clash = ( + db.query(models.User) + .filter(func.lower(models.User.email) == email.lower(), + models.User.id != current_user.id) + .first() + ) + if clash: + raise HTTPException(400, "Email already in use") + current_user.email = email + else: + current_user.email = None if data.display_name is not None: - dn = data.display_name.strip() + dn = _strip_controls(data.display_name) current_user.display_name = dn or current_user.username if data.username is not None: - new_login = data.username.strip().lower() + new_login = _strip_controls(data.username).lower() if not new_login: raise HTTPException(422, "Login name cannot be empty") if new_login != current_user.username: