fix(security): stop private-event leak in merge read + harden busy masking, uploads, profile

Findings from the security review:
- HIGH: private local events leaked in full (title/location/description) to
  anyone who could READ a shared or group calendar via GET /api/caldav/events —
  the private_event_visibility rule was only enforced in /groups/{id}/combined.
  Now enforced in the merge read too, via a shared helper (apply_event_privacy)
  so the two paths can't drift.
- HIGH: 'busy' masking was a blacklist that still leaked creator identity,
  source-calendar name, recurrence rule and per-event colour. Replaced with a
  whitelist (mask_busy_event): only timing/identity/render fields survive.
- MEDIUM: .ics import had no size limit (raw = await file.read()) → memory DoS.
  Now capped at 5 MB (413), read before creating any calendar.
- LOW/INFO: profile email now checked for uniqueness + basic format; display
  name / username / email length-capped and control-chars stripped.

Deferred (tracked): RRULE expansion cap at the trust boundary, SQLite
PRAGMA foreign_keys + ON DELETE cascade, and JWT-by-user-id + token version.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Scarriffle
2026-06-01 17:49:56 +02:00
parent 0d15af736d
commit 6869a15bb8
5 changed files with 127 additions and 25 deletions

View File

@@ -44,6 +44,53 @@ def resolve_creator(ev: models.LocalEvent, *, name_cache: Optional[dict] = None)
return None
def private_visibility_for(db: Session, user_id: int) -> str:
"""A user's chosen visibility for their private events ('hidden' | 'busy')."""
s = db.query(models.UserSettings).filter(models.UserSettings.user_id == user_id).first()
return (s.private_event_visibility if s else None) or "busy"
# Only these fields survive 'busy' anonymisation — a whitelist, so no content
# field (title/location/description/creator/calendar name/recurrence) can leak.
_BUSY_KEEP = {
"id", "url", "start", "end", "allDay", "calendar_id", "calendarColor",
"source", "type", "owner", "is_group_event", "display_color",
}
def mask_busy_event(event: dict) -> dict:
"""Anonymise a private event for 'busy' visibility: keep only timing /
identity / render fields, drop ALL content."""
masked = {k: event[k] for k in _BUSY_KEEP if k in event}
masked["title"] = "Beschäftigt"
masked["location"] = ""
masked["description"] = ""
masked["calendar_name"] = ""
masked["creator"] = None
masked["color"] = None
masked["rrule"] = None
masked["exdate"] = None
masked["private"] = True
return masked
def apply_event_privacy(
event: dict, *, owner_id, is_private: bool, requester_id: int, visibility: str
) -> Optional[dict]:
"""Enforce another user's private-event visibility on a built event dict.
Returns the event unchanged for the requester's own events or non-private
events, ``None`` when the owner chose 'hidden', or a busy-masked copy when
the owner chose 'busy'. Used by BOTH the merge read and the combined view so
the privacy rule can never drift between them.
"""
if not is_private or owner_id == requester_id:
return event
if visibility == "hidden":
return None
return mask_busy_event(event)
def build_local_event_dict(
ev: models.LocalEvent,
cal: models.LocalCalendar,

View File

@@ -14,7 +14,13 @@ import models
import permissions
from auth import get_current_user
from database import get_db
from local_events_util import build_local_event_dict, expand_recurring_local, resolve_creator
from local_events_util import (
apply_event_privacy,
build_local_event_dict,
expand_recurring_local,
private_visibility_for,
resolve_creator,
)
from routers.ical_router import _refresh_if_needed, get_events_for_subscription
logger = logging.getLogger(__name__)
@@ -335,6 +341,14 @@ def get_events(
.all()
) if readable_ids else []
name_cache = {u.id: (u.display_name or u.username) for u in db.query(models.User).all()}
# Cache each owner's private-event visibility (one lookup per owner, not per event).
vis_cache: dict = {}
def vis_for(uid: int) -> str:
if uid not in vis_cache:
vis_cache[uid] = private_visibility_for(db, uid)
return vis_cache[uid]
for local_cal in local_calendars:
local_events = (
db.query(models.LocalEvent)
@@ -351,10 +365,27 @@ def get_events(
)
for ev in local_events:
creator = resolve_creator(ev, name_cache=name_cache)
# A private event belonging to someone else (shared calendar or group
# calendar) must honour that owner's private_event_visibility, exactly
# like the group combined view — otherwise private titles/locations
# leak through the ordinary calendar read.
owner_id = ev.creator_id or local_cal.user_id
is_priv = bool(ev.is_private)
foreign_private = is_priv and owner_id != current_user.id
visibility = vis_for(owner_id) if foreign_private else "busy"
if foreign_private and visibility == "hidden":
continue
if ev.rrule:
all_events.extend(expand_recurring_local(ev, local_cal, start_dt, end_dt, creator=creator))
built = expand_recurring_local(ev, local_cal, start_dt, end_dt, creator=creator)
else:
all_events.append(build_local_event_dict(ev, local_cal, rrule=None, creator=creator))
built = [build_local_event_dict(ev, local_cal, rrule=None, creator=creator)]
for b in built:
b = apply_event_privacy(
b, owner_id=owner_id, is_private=is_priv,
requester_id=current_user.id, visibility=visibility,
)
if b is not None:
all_events.append(b)
# ── iCal subscription events ──────────────────────────
ical_subs = (

View File

@@ -19,7 +19,7 @@ from sqlalchemy.orm import Session
import models
from auth import get_current_user
from database import get_db
from local_events_util import build_local_event_dict, expand_recurring_local
from local_events_util import build_local_event_dict, expand_recurring_local, mask_busy_event
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -309,16 +309,6 @@ def delete_group(
return {"ok": True}
def _strip_busy(event: dict) -> dict:
"""Anonymise a private event for the 'busy' visibility mode."""
event = dict(event)
event["title"] = "Beschäftigt"
event["location"] = ""
event["description"] = ""
event["private"] = True
return event
def _first_name(name: Optional[str]) -> str:
if not name:
return ""
@@ -415,7 +405,7 @@ def combined_events(
for b in built:
if ev.is_private and creator_owner_id != current_user.id and visibility_for(creator_owner_id) == "busy":
b = _strip_busy(b)
b = mask_busy_event(b)
# Colour to render with: the group calendar's colour for group
# events, otherwise the owning member's group colour.
b["display_color"] = group_cal_color if is_group else member_color.get(owner_id)

View File

@@ -443,6 +443,17 @@ def _import_ics_into(cal: models.LocalCalendar, raw: bytes, db: Session) -> dict
return {"imported": imported, "skipped": skipped, "errors": errors}
# Cap .ics uploads so a huge file can't exhaust memory (read fully into RAM).
MAX_ICS_BYTES = 5 * 1024 * 1024 # 5 MB — generous for calendars
async def _read_capped(file: UploadFile) -> bytes:
raw = await file.read(MAX_ICS_BYTES + 1)
if len(raw) > MAX_ICS_BYTES:
raise HTTPException(413, "Datei zu groß (max. 5 MB)")
return raw
@router.post("/calendars/{calendar_id}/import")
async def import_calendar(
calendar_id: int,
@@ -451,7 +462,7 @@ async def import_calendar(
current_user: models.User = Depends(get_current_user),
):
cal = permissions.accessible_local_calendar(db, current_user, calendar_id, require_write=True)
raw = await file.read()
raw = await _read_capped(file)
try:
return _import_ics_into(cal, raw, db)
except ValueError as e:
@@ -467,10 +478,12 @@ async def import_generic(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
# Read (capped) first so an oversized upload can't leave an empty calendar.
raw = await _read_capped(file)
if create_calendar:
cal = models.LocalCalendar(
user_id=current_user.id,
name=calendar_name or "Importiert",
name=(calendar_name or "Importiert")[:120],
)
db.add(cal)
db.commit()
@@ -480,7 +493,6 @@ async def import_generic(
else:
raise HTTPException(422, "Provide calendar_id or create_calendar=true")
raw = await file.read()
try:
result = _import_ics_into(cal, raw, db)
except ValueError as e:

View File

@@ -1,4 +1,5 @@
import io
import re
import base64
from pathlib import Path
from typing import Optional
@@ -8,7 +9,7 @@ import qrcode
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import FileResponse, Response
from PIL import Image
from pydantic import BaseModel
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from sqlalchemy import func
@@ -27,9 +28,16 @@ ALLOWED_TYPES = {"image/jpeg", "image/png", "image/webp"}
# ── Schemas ───────────────────────────────────────────────
class ProfileUpdate(BaseModel):
email: Optional[str] = None
display_name: Optional[str] = None
username: Optional[str] = None # login name (stored lowercase)
# Length caps (SQLite ignores VARCHAR limits, so enforce here).
email: Optional[str] = Field(default=None, max_length=120)
display_name: Optional[str] = Field(default=None, max_length=80)
username: Optional[str] = Field(default=None, max_length=50) # login name (stored lowercase)
def _strip_controls(s: str) -> str:
"""Remove control characters (defends against injected newlines / NULs that
could be reflected in other clients' calendar/sharing/group views)."""
return re.sub(r"[\x00-\x1f\x7f]", "", s).strip()
class PasswordChange(BaseModel):
@@ -67,12 +75,26 @@ def update_profile(
):
result = {"ok": True}
if data.email is not None:
current_user.email = data.email or None
email = _strip_controls(data.email)
if email:
if "@" not in email or "." not in email.split("@")[-1]:
raise HTTPException(422, "Invalid email address")
clash = (
db.query(models.User)
.filter(func.lower(models.User.email) == email.lower(),
models.User.id != current_user.id)
.first()
)
if clash:
raise HTTPException(400, "Email already in use")
current_user.email = email
else:
current_user.email = None
if data.display_name is not None:
dn = data.display_name.strip()
dn = _strip_controls(data.display_name)
current_user.display_name = dn or current_user.username
if data.username is not None:
new_login = data.username.strip().lower()
new_login = _strip_controls(data.username).lower()
if not new_login:
raise HTTPException(422, "Login name cannot be empty")
if new_login != current_user.username: