feat: Kalender-Sharing, Gruppen, iCal Import/Export & Ersteller (Server)

Kollaborations-Features ausschliesslich fuer lokale Kalender:

- Sharing: calendar_shares-Tabelle, GET/POST/DELETE /api/local/calendars/{id}/shares
  (nur Besitzer), GET /api/users/directory, geteilte Kalender in
  GET /api/local/calendars (shared_by/permission/owned) und im Merge-Read.
- Gruppen: groups/group_members/group_calendars + /api/groups-Router inkl.
  kombinierter Ansicht /api/groups/{id}/combined (owner + is_group_event).
- Ersteller: local_events.creator_id (serverseitig gesetzt) + creator_name_external
  aus ORGANIZER; creator-Feld in allen lokalen Event-Responses.
- Private-Flag: local_events.is_private + user_settings.private_event_visibility
  (hidden|busy), Filterung in der Gruppenansicht.
- iCal Import/Export: ical_io.py, POST /api/local/calendars/{id}/import,
  POST /api/local/import, GET /api/local/calendars/{id}/export.
- Zentraler Berechtigungs-Helper (permissions.py) und gemeinsamer Event-Dict-
  Builder (local_events_util.py) ersetzen die Nur-Besitzer-Filter.
- pytest-Suite (12 Tests) fuer Sharing, Gruppen, Parser, Private-Filterung.

Additiv & rueckwaertskompatibel; Migrationen in main.py._migrate().

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Scarriffle
2026-05-31 16:05:18 +02:00
parent 362cc7212c
commit 32268a18b2
13 changed files with 1582 additions and 173 deletions

205
backend/ical_io.py Normal file
View File

@@ -0,0 +1,205 @@
"""iCal (.ics) import/export for local calendars.
Reuses the already-installed ``icalendar`` library. The parser produces dicts
matching the LocalEvent storage shape (ISO strings, comma-separated EXDATE);
the generator emits a VCALENDAR with ORGANIZER, RRULE, etc.
"""
from __future__ import annotations
import logging
import uuid
from datetime import date, datetime, timedelta, timezone
from icalendar import Calendar, Event, vCalAddress, vRecur, vText
logger = logging.getLogger(__name__)
def _rrule_to_str(component) -> str | None:
prop = component.get("RRULE")
if not prop:
return None
return prop.to_ical().decode("utf-8")
def _exdate_to_csv(component) -> str | None:
"""Collect EXDATE values as comma-separated YYYYMMDD strings."""
exdate = component.get("EXDATE")
if not exdate:
return None
items = exdate if isinstance(exdate, list) else [exdate]
out = []
for ex in items:
dts = getattr(ex, "dts", None) or []
for d in dts:
val = d.dt
if isinstance(val, datetime):
out.append(val.strftime("%Y%m%d"))
elif isinstance(val, date):
out.append(val.strftime("%Y%m%d"))
return ",".join(out) if out else None
def _organizer_name(component) -> str | None:
org = component.get("ORGANIZER")
if not org:
return None
# CN parameter holds the display name; fall back to the mailto address.
try:
cn = org.params.get("CN")
if cn:
return str(cn)
except Exception:
pass
raw = str(org)
if raw.lower().startswith("mailto:"):
return raw[7:]
return raw or None
def parse_ics(raw: bytes) -> dict:
"""Parse .ics bytes into {"events": [dict, ...], "errors": [str, ...]}.
Raises ValueError if the payload is not a parseable calendar at all.
"""
try:
cal = Calendar.from_ical(raw)
except Exception as e:
raise ValueError(f"Datei ist kein gültiges iCal-Format: {e}") from e
events = []
errors = []
for component in cal.walk():
if component.name != "VEVENT":
continue
try:
uid = str(component.get("UID") or uuid.uuid4())
title = str(component.get("SUMMARY", "") or "")
location = str(component.get("LOCATION", "") or "") or None
description = str(component.get("DESCRIPTION", "") or "") or None
dtstart_prop = component.get("DTSTART")
if dtstart_prop is None:
errors.append(f"VEVENT {uid}: kein DTSTART, übersprungen")
continue
dtstart = dtstart_prop.dt
dtend_prop = component.get("DTEND")
duration_prop = component.get("DURATION")
all_day = isinstance(dtstart, date) and not isinstance(dtstart, datetime)
if all_day:
if dtend_prop:
dtend = dtend_prop.dt
elif duration_prop:
dtend = dtstart + duration_prop.dt
else:
dtend = dtstart + timedelta(days=1)
start_str = dtstart.isoformat()
end_str = (dtend.isoformat() if isinstance(dtend, date)
else (dtstart + timedelta(days=1)).isoformat())
else:
if dtstart.tzinfo is None:
dtstart = dtstart.replace(tzinfo=timezone.utc)
if dtend_prop:
dtend = dtend_prop.dt
if isinstance(dtend, date) and not isinstance(dtend, datetime):
dtend = datetime.combine(dtend, datetime.min.time(), tzinfo=timezone.utc)
elif dtend.tzinfo is None:
dtend = dtend.replace(tzinfo=timezone.utc)
elif duration_prop:
dtend = dtstart + duration_prop.dt
else:
dtend = dtstart + timedelta(hours=1)
start_str = dtstart.isoformat()
end_str = dtend.isoformat()
events.append({
"uid": uid,
"title": title,
"start": start_str,
"end": end_str,
"all_day": all_day,
"location": location,
"description": description,
"rrule": _rrule_to_str(component),
"exdate": _exdate_to_csv(component),
"organizer": _organizer_name(component),
})
except Exception as exc:
logger.warning("Skipping malformed VEVENT: %s", exc)
errors.append(f"Fehlerhafter Eintrag übersprungen: {exc}")
return {"events": events, "errors": errors}
def _rrule_str_to_vrecur(rrule_str: str) -> vRecur:
params = {}
for part in rrule_str.split(";"):
if "=" not in part:
continue
key, val = part.split("=", 1)
params[key] = val.split(",") if "," in val else val
return vRecur(params)
def _parse_iso(s: str) -> datetime:
s = s.replace("Z", "+00:00")
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def build_ics(calendar, events, *, name_cache: dict | None = None) -> str:
"""Build a VCALENDAR string for a local calendar and its events."""
cal = Calendar()
cal.add("prodid", "-//Calendarr//EN")
cal.add("version", "2.0")
cal.add("x-wr-calname", calendar.name)
for ev in events:
item = Event()
item.add("uid", ev.uid)
item.add("summary", ev.title or "")
item.add("dtstamp", datetime.now(timezone.utc))
if ev.all_day:
try:
start = date.fromisoformat(ev.start[:10])
end = date.fromisoformat(ev.end[:10])
except ValueError:
continue
if end <= start:
end = start + timedelta(days=1)
item.add("dtstart", start)
item.add("dtend", end)
else:
try:
item.add("dtstart", _parse_iso(ev.start))
item.add("dtend", _parse_iso(ev.end))
except ValueError:
continue
if ev.location:
item.add("location", ev.location)
if ev.description:
item.add("description", ev.description)
if ev.color:
item.add("x-calendarr-color", ev.color)
if ev.rrule:
item.add("rrule", _rrule_str_to_vrecur(ev.rrule))
# ORGANIZER from the creator (local user or imported name).
organizer_name = None
if getattr(ev, "creator_id", None) and name_cache:
organizer_name = name_cache.get(ev.creator_id)
if not organizer_name:
organizer_name = getattr(ev, "creator_name_external", None)
if organizer_name:
organizer = vCalAddress("mailto:noreply@calendarr.local")
organizer.params["CN"] = vText(organizer_name.replace('"', ""))
item.add("organizer", organizer)
cal.add_component(item)
return cal.to_ical().decode("utf-8")

View File

@@ -0,0 +1,165 @@
"""Shared builders for local-event API dicts.
Every local event returned by the API (the local router, the unified event
merge in caldav_router, and the group combined view) must look identical and
carry the additive collaboration fields: ``creator``, ``private``, ``type``,
and — in the group view — ``owner`` and ``is_group_event``.
Centralising this avoids the three near-duplicate dict constructions that used
to live in caldav_router.py.
"""
import logging
from datetime import datetime as dt_datetime, date as dt_date, timedelta, timezone as dt_timezone
from typing import Optional
from dateutil.rrule import rrulestr
from sqlalchemy.orm import Session
import models
logger = logging.getLogger(__name__)
def resolve_creator(ev: models.LocalEvent, *, name_cache: Optional[dict] = None) -> Optional[dict]:
"""Build the ``creator`` payload for an event.
Returns ``{"id": int, "display_name": username}`` for a local creator,
``{"id": None, "display_name": "<name> (importiert)"}`` for an imported
event, or ``None`` when no creator info exists (legacy events).
``name_cache`` maps user_id -> username to avoid per-event DB lookups; the
creator relationship is used as a fallback.
"""
if ev.creator_id:
display = None
if name_cache is not None:
display = name_cache.get(ev.creator_id)
if display is None and ev.creator is not None:
display = ev.creator.username
if display is not None:
return {"id": ev.creator_id, "display_name": display}
if ev.creator_name_external:
return {"id": None, "display_name": f"{ev.creator_name_external} (importiert)"}
return None
def build_local_event_dict(
ev: models.LocalEvent,
cal: models.LocalCalendar,
*,
start: Optional[str] = None,
end: Optional[str] = None,
all_day: Optional[bool] = None,
rrule: Optional[str] = ...,
creator: Optional[dict] = None,
owner: Optional[dict] = None,
is_group_event: bool = False,
) -> dict:
"""Build the unified dict for a single local event (or occurrence).
``start``/``end``/``all_day`` override the stored values (used when emitting
an expanded recurrence occurrence). ``owner``/``is_group_event`` are only set
by the group combined view.
"""
d = {
"id": ev.uid,
"url": f"local://{ev.uid}",
"title": ev.title,
"start": ev.start if start is None else start,
"end": ev.end if end is None else end,
"allDay": ev.all_day if all_day is None else all_day,
"location": ev.location or "",
"description": ev.description or "",
"color": ev.color,
"rrule": ev.rrule if rrule is ... else rrule,
"exdate": ev.exdate,
"calendar_id": f"local-{cal.id}",
"calendar_name": cal.name,
"calendarColor": cal.color,
"source": "local",
"type": "local",
"creator": creator,
"private": bool(ev.is_private),
}
if owner is not None:
d["owner"] = owner
if is_group_event:
d["is_group_event"] = True
return d
def expand_recurring_local(
ev: models.LocalEvent,
local_cal: models.LocalCalendar,
range_start,
range_end,
*,
creator: Optional[dict] = None,
owner: Optional[dict] = None,
is_group_event: bool = False,
) -> list:
"""Expand a recurring LocalEvent into individual occurrences in the range."""
results = []
excluded = set()
if ev.exdate:
for d in ev.exdate.split(","):
d = d.strip()
if d:
excluded.add(d)
try:
ev_start_str = ev.start.replace("Z", "+00:00")
ev_end_str = ev.end.replace("Z", "+00:00")
if ev.all_day:
ev_start = dt_date.fromisoformat(ev_start_str[:10])
ev_end = dt_date.fromisoformat(ev_end_str[:10])
duration = ev_end - ev_start
rule = rrulestr(f"RRULE:{ev.rrule}", dtstart=dt_datetime.combine(ev_start, dt_datetime.min.time()))
r_start = dt_datetime.combine(range_start if isinstance(range_start, dt_date) else range_start.date(), dt_datetime.min.time())
r_end = dt_datetime.combine(range_end if isinstance(range_end, dt_date) else range_end.date(), dt_datetime.min.time())
occurrences = rule.between(r_start - timedelta(days=1), r_end + timedelta(days=1), inc=True)
for occ in occurrences:
occ_start = occ.date()
occ_key = occ_start.strftime("%Y%m%d")
if occ_key in excluded:
continue
occ_end = occ_start + duration
results.append(build_local_event_dict(
ev, local_cal,
start=occ_start.isoformat(), end=occ_end.isoformat(), all_day=True,
creator=creator, owner=owner, is_group_event=is_group_event,
))
else:
ev_start = dt_datetime.fromisoformat(ev_start_str)
ev_end = dt_datetime.fromisoformat(ev_end_str)
if ev_start.tzinfo is None:
ev_start = ev_start.replace(tzinfo=dt_timezone.utc)
if ev_end.tzinfo is None:
ev_end = ev_end.replace(tzinfo=dt_timezone.utc)
duration = ev_end - ev_start
rule = rrulestr(f"RRULE:{ev.rrule}", dtstart=ev_start)
r_start = range_start if isinstance(range_start, dt_datetime) else dt_datetime.combine(range_start, dt_datetime.min.time(), tzinfo=dt_timezone.utc)
r_end = range_end if isinstance(range_end, dt_datetime) else dt_datetime.combine(range_end, dt_datetime.min.time(), tzinfo=dt_timezone.utc)
if r_start.tzinfo is None:
r_start = r_start.replace(tzinfo=dt_timezone.utc)
if r_end.tzinfo is None:
r_end = r_end.replace(tzinfo=dt_timezone.utc)
occurrences = rule.between(r_start - timedelta(days=1), r_end + timedelta(days=1), inc=True)
for occ in occurrences:
occ_key = occ.strftime("%Y%m%d")
if occ_key in excluded:
continue
occ_end = occ + duration
results.append(build_local_event_dict(
ev, local_cal,
start=occ.isoformat(), end=occ_end.isoformat(), all_day=False,
creator=creator, owner=owner, is_group_event=is_group_event,
))
except Exception as exc:
logger.warning("Error expanding recurring event %s: %s", ev.uid, exc)
# Fall back to a single event.
results.append(build_local_event_dict(
ev, local_cal, creator=creator, owner=owner, is_group_event=is_group_event,
))
return results

View File

@@ -17,7 +17,7 @@ STATIC_CACHE = f"public, max-age={STATIC_MAX_AGE_SECONDS}, must-revalidate"
sys.path.insert(0, str(Path(__file__).parent))
from database import Base, engine
from routers import auth_router, caldav_router, google_router, homeassistant_router, ical_router, local_router, profile_router, settings_router, users_router
from routers import auth_router, caldav_router, google_router, groups_router, homeassistant_router, ical_router, local_router, profile_router, settings_router, users_router
logging.basicConfig(level=logging.INFO)
@@ -132,6 +132,32 @@ def _migrate():
except Exception:
pass
# ── Collaboration features (sharing, groups, creator, private) ──
try:
conn.execute(text("ALTER TABLE user_settings ADD COLUMN private_event_visibility VARCHAR(10) DEFAULT 'busy'"))
conn.commit()
logging.info("Migration: added private_event_visibility to user_settings")
except Exception:
pass
try:
conn.execute(text("ALTER TABLE local_events ADD COLUMN creator_id INTEGER"))
conn.commit()
logging.info("Migration: added creator_id to local_events")
except Exception:
pass
try:
conn.execute(text("ALTER TABLE local_events ADD COLUMN creator_name_external TEXT"))
conn.commit()
logging.info("Migration: added creator_name_external to local_events")
except Exception:
pass
try:
conn.execute(text("ALTER TABLE local_events ADD COLUMN is_private BOOLEAN DEFAULT 0"))
conn.commit()
logging.info("Migration: added is_private to local_events")
except Exception:
pass
_migrate()
app = FastAPI(title="Calendarr", docs_url=None, redoc_url=None)
@@ -170,6 +196,7 @@ app.include_router(caldav_router.router, prefix="/api/caldav", tags=["caldav"])
app.include_router(settings_router.router, prefix="/api/settings", tags=["settings"])
app.include_router(profile_router.router, prefix="/api/profile", tags=["profile"])
app.include_router(local_router.router, prefix="/api/local", tags=["local"])
app.include_router(groups_router.router, prefix="/api/groups", tags=["groups"])
app.include_router(ical_router.router, prefix="/api/ical", tags=["ical"])
app.include_router(google_router.router, prefix="/api/google", tags=["google"])
app.include_router(homeassistant_router.router, prefix="/api/homeassistant", tags=["homeassistant"])

View File

@@ -1,4 +1,4 @@
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, UniqueConstraint
from sqlalchemy.orm import relationship
from database import Base
@@ -34,6 +34,11 @@ class User(Base):
"HomeAssistantAccount", back_populates="user", cascade="all, delete-orphan"
)
@property
def display_name(self) -> str:
"""No dedicated display-name column exists — fall back to the username."""
return self.username
class CalDAVAccount(Base):
__tablename__ = "caldav_accounts"
@@ -87,6 +92,9 @@ class UserSettings(Base):
text_color = Column(String(7), nullable=True) # Override für --text-1 (NULL = nutze text_contrast)
line_color = Column(String(7), nullable=True) # Override für --border (NULL = nutze line_contrast)
bg_color = Column(String(7), nullable=True) # Override für --bg-app (NULL = Default)
# How this user's private events appear to other group members:
# 'hidden' = invisible, 'busy' = anonymous busy block (default).
private_event_visibility = Column(String(10), default="busy")
user = relationship("User", back_populates="settings")
@@ -119,8 +127,15 @@ class LocalEvent(Base):
color = Column(String(7), nullable=True)
rrule = Column(Text, nullable=True)
exdate = Column(Text, nullable=True) # Comma-separated YYYYMMDD dates to exclude
# Creator: set server-side from the auth token on create, never from the client.
creator_id = Column(Integer, ForeignKey("users.id"), nullable=True)
# For imported events without a local user (from the .ics ORGANIZER field).
creator_name_external = Column(Text, nullable=True)
# Private events are filtered for other group members per their visibility setting.
is_private = Column(Boolean, default=False)
calendar = relationship("LocalCalendar", back_populates="events")
creator = relationship("User")
class ICalSubscription(Base):
@@ -219,3 +234,74 @@ class HomeAssistantCalendar(Base):
sidebar_hidden = Column(Boolean, default=False)
account = relationship("HomeAssistantAccount", back_populates="calendars")
# ── Collaboration: sharing & groups (local calendars only) ────────────────
class CalendarShare(Base):
"""A local calendar shared with another Calendarr user."""
__tablename__ = "calendar_shares"
__table_args__ = (
UniqueConstraint("calendar_id", "user_id", name="uq_calendar_share"),
)
id = Column(Integer, primary_key=True, index=True)
calendar_id = Column(Integer, ForeignKey("local_calendars.id"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
permission = Column(String(20), default="read") # 'read' | 'read_write'
created_at = Column(String(50), nullable=True) # ISO 8601
calendar = relationship("LocalCalendar")
user = relationship("User")
class Group(Base):
__tablename__ = "groups"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
created_by = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(String(50), nullable=True) # ISO 8601
members = relationship(
"GroupMember", back_populates="group", cascade="all, delete-orphan"
)
group_calendar = relationship(
"GroupCalendar", back_populates="group", uselist=False,
cascade="all, delete-orphan",
)
class GroupMember(Base):
__tablename__ = "group_members"
__table_args__ = (
UniqueConstraint("group_id", "user_id", name="uq_group_member"),
)
id = Column(Integer, primary_key=True, index=True)
group_id = Column(Integer, ForeignKey("groups.id"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
role = Column(String(10), default="member") # 'owner' | 'member'
joined_at = Column(String(50), nullable=True) # ISO 8601
group = relationship("Group", back_populates="members")
user = relationship("User")
class GroupCalendar(Base):
"""1:1 link between a group and its shared local calendar."""
__tablename__ = "group_calendars"
__table_args__ = (
UniqueConstraint("group_id", name="uq_group_calendar_group"),
UniqueConstraint("calendar_id", name="uq_group_calendar_calendar"),
)
id = Column(Integer, primary_key=True, index=True)
group_id = Column(Integer, ForeignKey("groups.id"), nullable=False)
calendar_id = Column(Integer, ForeignKey("local_calendars.id"), nullable=False)
group = relationship("Group", back_populates="group_calendar")
calendar = relationship("LocalCalendar")

126
backend/permissions.py Normal file
View File

@@ -0,0 +1,126 @@
"""Central access control for local calendars.
Local calendars are visible/writable to a user if any of the following holds:
- the user owns the calendar (LocalCalendar.user_id),
- the calendar is shared with the user (calendar_shares; write needs 'read_write'),
- the calendar is a group calendar and the user is a member of that group
(members get read & write).
These helpers replace the scattered owner-only filters so sharing and groups
work consistently across every local-calendar endpoint and the event merge read.
"""
from typing import Optional
from fastapi import HTTPException
from sqlalchemy.orm import Session
import models
def _share_for(db: Session, calendar_id: int, user_id: int) -> Optional[models.CalendarShare]:
return (
db.query(models.CalendarShare)
.filter(
models.CalendarShare.calendar_id == calendar_id,
models.CalendarShare.user_id == user_id,
)
.first()
)
def _is_group_calendar_member(db: Session, calendar_id: int, user_id: int) -> bool:
gc = (
db.query(models.GroupCalendar)
.filter(models.GroupCalendar.calendar_id == calendar_id)
.first()
)
if not gc:
return False
member = (
db.query(models.GroupMember)
.filter(
models.GroupMember.group_id == gc.group_id,
models.GroupMember.user_id == user_id,
)
.first()
)
return member is not None
def accessible_local_calendar(
db: Session,
user: models.User,
calendar_id: int,
*,
require_write: bool = False,
) -> models.LocalCalendar:
"""Return the calendar if the user may access it, else raise 404/403.
404 when the calendar does not exist or is not visible to the user (so we
don't leak existence). 403 when it is visible (read) but write is required.
"""
cal = (
db.query(models.LocalCalendar)
.filter(models.LocalCalendar.id == calendar_id)
.first()
)
if not cal:
raise HTTPException(404, "Calendar not found")
if cal.user_id == user.id:
return cal # owner: full access
if _is_group_calendar_member(db, calendar_id, user.id):
return cal # group members get read & write
share = _share_for(db, calendar_id, user.id)
if share is None:
raise HTTPException(404, "Calendar not found")
if require_write and share.permission != "read_write":
raise HTTPException(403, "You only have read access to this calendar")
return cal
def is_calendar_owner(db: Session, user: models.User, calendar_id: int) -> models.LocalCalendar:
"""Return the calendar only if the user owns it, else raise 404."""
cal = (
db.query(models.LocalCalendar)
.filter(
models.LocalCalendar.id == calendar_id,
models.LocalCalendar.user_id == user.id,
)
.first()
)
if not cal:
raise HTTPException(404, "Calendar not found")
return cal
def readable_local_calendar_ids(db: Session, user: models.User) -> list[int]:
"""All local calendar ids the user may read: own + shared + group calendars."""
ids: set[int] = set()
own = (
db.query(models.LocalCalendar.id)
.filter(models.LocalCalendar.user_id == user.id)
.all()
)
ids.update(r[0] for r in own)
shared = (
db.query(models.CalendarShare.calendar_id)
.filter(models.CalendarShare.user_id == user.id)
.all()
)
ids.update(r[0] for r in shared)
group_cals = (
db.query(models.GroupCalendar.calendar_id)
.join(models.GroupMember, models.GroupMember.group_id == models.GroupCalendar.group_id)
.filter(models.GroupMember.user_id == user.id)
.all()
)
ids.update(r[0] for r in group_cals)
return list(ids)

View File

@@ -11,8 +11,10 @@ from sqlalchemy import or_
import caldav_client
import models
import permissions
from auth import get_current_user
from database import get_db
from local_events_util import build_local_event_dict, expand_recurring_local, resolve_creator
from routers.ical_router import _refresh_if_needed, get_events_for_subscription
logger = logging.getLogger(__name__)
@@ -82,101 +84,6 @@ def _account_dict(a: models.CalDAVAccount) -> dict:
}
def _expand_recurring_local(ev, local_cal, range_start, range_end):
"""Expand a recurring LocalEvent into individual occurrences within the date range."""
results = []
# Parse excluded dates
excluded = set()
if ev.exdate:
for d in ev.exdate.split(","):
d = d.strip()
if d:
excluded.add(d)
try:
ev_start_str = ev.start.replace("Z", "+00:00")
ev_end_str = ev.end.replace("Z", "+00:00")
if ev.all_day:
ev_start = dt_date.fromisoformat(ev_start_str[:10])
ev_end = dt_date.fromisoformat(ev_end_str[:10])
duration = ev_end - ev_start
rule = rrulestr(f"RRULE:{ev.rrule}", dtstart=dt_datetime.combine(ev_start, dt_datetime.min.time()))
r_start = dt_datetime.combine(range_start if isinstance(range_start, dt_date) else range_start.date(), dt_datetime.min.time())
r_end = dt_datetime.combine(range_end if isinstance(range_end, dt_date) else range_end.date(), dt_datetime.min.time())
occurrences = rule.between(r_start - timedelta(days=1), r_end + timedelta(days=1), inc=True)
for occ in occurrences:
occ_start = occ.date()
occ_key = occ_start.strftime("%Y%m%d")
if occ_key in excluded:
continue
occ_end = occ_start + duration
results.append({
"id": ev.uid,
"url": f"local://{ev.uid}",
"title": ev.title,
"start": occ_start.isoformat(),
"end": occ_end.isoformat(),
"allDay": True,
"location": ev.location or "",
"description": ev.description or "",
"color": ev.color,
"rrule": ev.rrule,
"calendar_id": f"local-{local_cal.id}",
"calendar_name": local_cal.name,
"calendarColor": local_cal.color,
"source": "local",
})
else:
ev_start = dt_datetime.fromisoformat(ev_start_str)
ev_end = dt_datetime.fromisoformat(ev_end_str)
if ev_start.tzinfo is None:
ev_start = ev_start.replace(tzinfo=dt_timezone.utc)
if ev_end.tzinfo is None:
ev_end = ev_end.replace(tzinfo=dt_timezone.utc)
duration = ev_end - ev_start
rule = rrulestr(f"RRULE:{ev.rrule}", dtstart=ev_start)
r_start = range_start if isinstance(range_start, dt_datetime) else dt_datetime.combine(range_start, dt_datetime.min.time(), tzinfo=dt_timezone.utc)
r_end = range_end if isinstance(range_end, dt_datetime) else dt_datetime.combine(range_end, dt_datetime.min.time(), tzinfo=dt_timezone.utc)
if r_start.tzinfo is None:
r_start = r_start.replace(tzinfo=dt_timezone.utc)
if r_end.tzinfo is None:
r_end = r_end.replace(tzinfo=dt_timezone.utc)
occurrences = rule.between(r_start - timedelta(days=1), r_end + timedelta(days=1), inc=True)
for occ in occurrences:
occ_key = occ.strftime("%Y%m%d")
if occ_key in excluded:
continue
occ_end = occ + duration
results.append({
"id": ev.uid,
"url": f"local://{ev.uid}",
"title": ev.title,
"start": occ.isoformat(),
"end": occ_end.isoformat(),
"allDay": False,
"location": ev.location or "",
"description": ev.description or "",
"color": ev.color,
"rrule": ev.rrule,
"calendar_id": f"local-{local_cal.id}",
"calendar_name": local_cal.name,
"calendarColor": local_cal.color,
"source": "local",
})
except Exception as exc:
logger.warning("Error expanding recurring event %s: %s", ev.uid, exc)
# Fall back to single event
results.append({
"id": ev.uid, "url": f"local://{ev.uid}", "title": ev.title,
"start": ev.start, "end": ev.end, "allDay": ev.all_day,
"location": ev.location or "", "description": ev.description or "",
"color": ev.color, "rrule": ev.rrule,
"calendar_id": f"local-{local_cal.id}", "calendar_name": local_cal.name,
"calendarColor": local_cal.color, "source": "local",
})
return results
def _normalize_url(url: str) -> str:
"""Normalize URL for comparison: lowercase scheme/host, strip trailing slash."""
parsed = urlparse(url)
@@ -417,15 +324,17 @@ def get_events(
"Error fetching calendar %s: %s", calendar.id, exc
)
# ── Local calendar events ─────────────────────────────
# ── Local calendar events (own + shared + group calendars) ─────────────
readable_ids = permissions.readable_local_calendar_ids(db, current_user)
local_calendars = (
db.query(models.LocalCalendar)
.filter(
models.LocalCalendar.user_id == current_user.id,
models.LocalCalendar.id.in_(readable_ids),
models.LocalCalendar.enabled == True,
)
.all()
)
) if readable_ids else []
name_cache = {u.id: u.username for u in db.query(models.User).all()}
for local_cal in local_calendars:
local_events = (
db.query(models.LocalEvent)
@@ -441,25 +350,11 @@ def get_events(
.all()
)
for ev in local_events:
creator = resolve_creator(ev, name_cache=name_cache)
if ev.rrule:
all_events.extend(_expand_recurring_local(ev, local_cal, start_dt, end_dt))
all_events.extend(expand_recurring_local(ev, local_cal, start_dt, end_dt, creator=creator))
else:
all_events.append({
"id": ev.uid,
"url": f"local://{ev.uid}",
"title": ev.title,
"start": ev.start,
"end": ev.end,
"allDay": ev.all_day,
"location": ev.location or "",
"description": ev.description or "",
"color": ev.color,
"rrule": None,
"calendar_id": f"local-{local_cal.id}",
"calendar_name": local_cal.name,
"calendarColor": local_cal.color,
"source": "local",
})
all_events.append(build_local_event_dict(ev, local_cal, rrule=None, creator=creator))
# ── iCal subscription events ──────────────────────────
ical_subs = (

View File

@@ -0,0 +1,334 @@
"""Groups: shared group calendar + combined member-calendar overlay view.
A group has members and exactly one group calendar (a local calendar owned by
the creator, linked via group_calendars). Members get read/write on the group
calendar (enforced by permissions.accessible_local_calendar). The combined view
overlays every member's local calendars plus the group calendar, applying each
member's private-event visibility setting.
"""
import logging
from datetime import datetime, timezone
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import or_
from sqlalchemy.orm import Session
import models
from auth import get_current_user
from database import get_db
from local_events_util import build_local_event_dict, expand_recurring_local
logger = logging.getLogger(__name__)
router = APIRouter()
PALETTE = ["#4285f4", "#ea4335", "#fbbc04", "#34a853", "#ff6d00", "#46bdc6", "#8e24aa"]
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class GroupCreate(BaseModel):
name: str
member_ids: List[int] = []
class MemberAdd(BaseModel):
user_id: int
def _membership(db: Session, group_id: int, user_id: int) -> Optional[models.GroupMember]:
return (
db.query(models.GroupMember)
.filter(
models.GroupMember.group_id == group_id,
models.GroupMember.user_id == user_id,
)
.first()
)
def _require_member(db: Session, group: models.Group, user: models.User) -> models.GroupMember:
m = _membership(db, group.id, user.id)
if not m:
raise HTTPException(403, "You are not a member of this group")
return m
def _require_owner(db: Session, group: models.Group, user: models.User) -> None:
m = _membership(db, group.id, user.id)
if not m or m.role != "owner":
raise HTTPException(403, "Only the group owner may do this")
def _get_group_or_404(db: Session, group_id: int) -> models.Group:
g = db.query(models.Group).filter(models.Group.id == group_id).first()
if not g:
raise HTTPException(404, "Group not found")
return g
def _group_calendar_id(db: Session, group_id: int) -> Optional[int]:
gc = (
db.query(models.GroupCalendar)
.filter(models.GroupCalendar.group_id == group_id)
.first()
)
return gc.calendar_id if gc else None
@router.post("/")
def create_group(
data: GroupCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
group = models.Group(name=data.name, created_by=current_user.id, created_at=_now_iso())
db.add(group)
db.flush()
# Creator is owner; add the requested members (deduped, excluding creator).
db.add(models.GroupMember(group_id=group.id, user_id=current_user.id, role="owner", joined_at=_now_iso()))
seen = {current_user.id}
for uid in data.member_ids:
if uid in seen:
continue
if not db.query(models.User).filter(models.User.id == uid).first():
continue
db.add(models.GroupMember(group_id=group.id, user_id=uid, role="member", joined_at=_now_iso()))
seen.add(uid)
# Auto-create the group calendar (a local calendar owned by the creator).
cal = models.LocalCalendar(
user_id=current_user.id,
name=f"{data.name} (Gruppe)",
color=PALETTE[group.id % len(PALETTE)],
)
db.add(cal)
db.flush()
db.add(models.GroupCalendar(group_id=group.id, calendar_id=cal.id))
db.commit()
db.refresh(group)
return _group_detail(db, group, current_user)
@router.get("/")
def list_groups(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
memberships = (
db.query(models.GroupMember)
.filter(models.GroupMember.user_id == current_user.id)
.all()
)
out = []
for m in memberships:
group = db.query(models.Group).filter(models.Group.id == m.group_id).first()
if not group:
continue
member_count = db.query(models.GroupMember).filter(models.GroupMember.group_id == group.id).count()
out.append({
"id": group.id,
"name": group.name,
"role": m.role,
"member_count": member_count,
"group_calendar_id": _group_calendar_id(db, group.id),
})
return out
def _group_detail(db: Session, group: models.Group, current_user: models.User) -> dict:
members = db.query(models.GroupMember).filter(models.GroupMember.group_id == group.id).all()
member_dicts = []
for m in members:
u = db.query(models.User).filter(models.User.id == m.user_id).first()
member_dicts.append({
"id": m.user_id,
"display_name": u.username if u else None,
"role": m.role,
})
return {
"id": group.id,
"name": group.name,
"created_by": group.created_by,
"members": member_dicts,
"group_calendar_id": _group_calendar_id(db, group.id),
}
@router.get("/{group_id}")
def get_group(
group_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
group = _get_group_or_404(db, group_id)
_require_member(db, group, current_user)
return _group_detail(db, group, current_user)
@router.post("/{group_id}/members")
def add_member(
group_id: int,
data: MemberAdd,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
group = _get_group_or_404(db, group_id)
_require_owner(db, group, current_user)
if not db.query(models.User).filter(models.User.id == data.user_id).first():
raise HTTPException(404, "User not found")
if _membership(db, group_id, data.user_id):
return {"ok": True} # already a member
db.add(models.GroupMember(group_id=group_id, user_id=data.user_id, role="member", joined_at=_now_iso()))
db.commit()
return {"ok": True}
@router.delete("/{group_id}/members/{user_id}")
def remove_member(
group_id: int,
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
group = _get_group_or_404(db, group_id)
# Owner can remove anyone; a member may remove themselves (leave).
if user_id != current_user.id:
_require_owner(db, group, current_user)
else:
_require_member(db, group, current_user)
target = _membership(db, group_id, user_id)
if not target:
raise HTTPException(404, "Member not found")
if target.role == "owner":
raise HTTPException(422, "The owner cannot be removed; delete the group instead")
db.delete(target)
db.commit()
return {"ok": True}
@router.delete("/{group_id}")
def delete_group(
group_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
group = _get_group_or_404(db, group_id)
_require_owner(db, group, current_user)
# Remove the group calendar (and its events) too.
gc = db.query(models.GroupCalendar).filter(models.GroupCalendar.group_id == group_id).first()
if gc:
cal = db.query(models.LocalCalendar).filter(models.LocalCalendar.id == gc.calendar_id).first()
if cal:
db.delete(cal) # cascades to events
db.delete(group) # cascades to members + group_calendar link
db.commit()
return {"ok": True}
def _strip_busy(event: dict) -> dict:
"""Anonymise a private event for the 'busy' visibility mode."""
event = dict(event)
event["title"] = "Beschäftigt"
event["location"] = ""
event["description"] = ""
event["private"] = True
return event
@router.get("/{group_id}/combined")
def combined_events(
group_id: int,
start: str = Query(...),
end: str = Query(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
group = _get_group_or_404(db, group_id)
_require_member(db, group, current_user)
try:
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
except ValueError:
raise HTTPException(400, "Invalid date format — use ISO 8601")
if start_dt.tzinfo is None:
start_dt = start_dt.replace(tzinfo=timezone.utc)
if end_dt.tzinfo is None:
end_dt = end_dt.replace(tzinfo=timezone.utc)
members = db.query(models.GroupMember).filter(models.GroupMember.group_id == group_id).all()
name_cache = {u.id: u.username for u in db.query(models.User).all()}
visibility_cache: dict[int, str] = {}
def visibility_for(user_id: int) -> str:
if user_id not in visibility_cache:
s = db.query(models.UserSettings).filter(models.UserSettings.user_id == user_id).first()
visibility_cache[user_id] = (s.private_event_visibility if s else None) or "busy"
return visibility_cache[user_id]
group_cal_id = _group_calendar_id(db, group_id)
all_events: list[dict] = []
def emit_calendar(cal: models.LocalCalendar, owner_id: int, is_group: bool):
owner_user = name_cache.get(owner_id)
owner = {"id": owner_id, "display_name": owner_user}
events = (
db.query(models.LocalEvent)
.filter(
models.LocalEvent.calendar_id == cal.id,
or_(
(models.LocalEvent.rrule == None) & (models.LocalEvent.start < end) & (models.LocalEvent.end > start),
models.LocalEvent.rrule != None,
),
)
.all()
)
for ev in events:
creator_owner_id = ev.creator_id or owner_id
# Private filtering for events that belong to someone else.
if ev.is_private and creator_owner_id != current_user.id:
vis = visibility_for(creator_owner_id)
if vis == "hidden":
continue
creator = None
if ev.creator_id and name_cache.get(ev.creator_id):
creator = {"id": ev.creator_id, "display_name": name_cache[ev.creator_id]}
elif ev.creator_name_external:
creator = {"id": None, "display_name": f"{ev.creator_name_external} (importiert)"}
if ev.rrule:
built = expand_recurring_local(ev, cal, start_dt, end_dt, creator=creator, owner=owner, is_group_event=is_group)
else:
built = [build_local_event_dict(ev, cal, rrule=None, creator=creator, owner=owner, is_group_event=is_group)]
for b in built:
if ev.is_private and creator_owner_id != current_user.id and visibility_for(creator_owner_id) == "busy":
b = _strip_busy(b)
all_events.append(b)
# Each member's own local calendars (excluding the group calendar to avoid dupes).
for m in members:
member_cals = (
db.query(models.LocalCalendar)
.filter(models.LocalCalendar.user_id == m.user_id)
.all()
)
for cal in member_cals:
if group_cal_id is not None and cal.id == group_cal_id:
continue
emit_calendar(cal, m.user_id, is_group=False)
# The group calendar itself.
if group_cal_id is not None:
group_cal = db.query(models.LocalCalendar).filter(models.LocalCalendar.id == group_cal_id).first()
if group_cal:
emit_calendar(group_cal, group_cal.user_id, is_group=True)
return {"events": all_events}

View File

@@ -1,17 +1,26 @@
import uuid
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, Form, HTTPException, Query, UploadFile, File
from fastapi.responses import Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
import ical_io
import models
import permissions
from auth import get_current_user
from database import get_db
from local_events_util import build_local_event_dict, resolve_creator
router = APIRouter()
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class CalendarCreate(BaseModel):
name: str
color: str = "#34a853"
@@ -33,6 +42,7 @@ class EventCreate(BaseModel):
description: Optional[str] = None
color: Optional[str] = None
rrule: Optional[str] = None
private: bool = False
class EventUpdate(BaseModel):
@@ -45,35 +55,33 @@ class EventUpdate(BaseModel):
color: Optional[str] = None
rrule: Optional[str] = None
exdate: Optional[str] = None
private: Optional[bool] = None
def _cal_dict(cal: models.LocalCalendar) -> dict:
return {
class ShareCreate(BaseModel):
user_id: int
permission: str = "read"
def _cal_dict(cal: models.LocalCalendar, *, owned: bool = True,
shared_by: Optional[str] = None, permission: Optional[str] = None) -> dict:
d = {
"id": cal.id,
"name": cal.name,
"color": cal.color,
"enabled": cal.enabled,
"type": "local",
"owned": owned,
}
if shared_by is not None:
d["shared_by"] = shared_by
if permission is not None:
d["permission"] = permission
return d
def _event_dict(ev: models.LocalEvent, cal: models.LocalCalendar) -> dict:
return {
"id": ev.uid,
"url": f"local://{ev.uid}",
"title": ev.title,
"start": ev.start,
"end": ev.end,
"allDay": ev.all_day,
"location": ev.location or "",
"description": ev.description or "",
"color": ev.color,
"rrule": ev.rrule,
"exdate": ev.exdate,
"calendar_id": f"local-{cal.id}",
"calendar_name": cal.name,
"calendarColor": cal.color,
"source": "local",
}
def _event_dict(ev: models.LocalEvent, cal: models.LocalCalendar, db: Session) -> dict:
return build_local_event_dict(ev, cal, creator=resolve_creator(ev))
# ── Calendar CRUD ─────────────────────────────────────────
@@ -83,12 +91,31 @@ def list_calendars(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
cals = (
# Own calendars
own = (
db.query(models.LocalCalendar)
.filter(models.LocalCalendar.user_id == current_user.id)
.all()
)
return [_cal_dict(c) for c in cals]
result = [_cal_dict(c, owned=True) for c in own]
# Calendars shared with this user
shares = (
db.query(models.CalendarShare)
.filter(models.CalendarShare.user_id == current_user.id)
.all()
)
for share in shares:
cal = share.calendar
if cal is None:
continue
owner = db.query(models.User).filter(models.User.id == cal.user_id).first()
result.append(_cal_dict(
cal, owned=False,
shared_by=owner.username if owner else None,
permission=share.permission,
))
return result
@router.post("/calendars")
@@ -164,16 +191,10 @@ def create_event(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
cal = (
db.query(models.LocalCalendar)
.filter(
models.LocalCalendar.id == data.calendar_id,
models.LocalCalendar.user_id == current_user.id,
)
.first()
# Owner, shared (read_write), or group-member calendars are writable.
cal = permissions.accessible_local_calendar(
db, current_user, data.calendar_id, require_write=True
)
if not cal:
raise HTTPException(404, "Calendar not found")
ev = models.LocalEvent(
calendar_id=cal.id,
@@ -186,11 +207,22 @@ def create_event(
description=data.description,
color=data.color,
rrule=data.rrule,
is_private=data.private,
creator_id=current_user.id, # server-side, never from the client
)
db.add(ev)
db.commit()
db.refresh(ev)
return _event_dict(ev, cal)
return _event_dict(ev, cal, db)
def _writable_event(db: Session, current_user: models.User, uid: str) -> models.LocalEvent:
ev = db.query(models.LocalEvent).filter(models.LocalEvent.uid == uid).first()
if not ev:
raise HTTPException(404, "Event not found")
# Raises 404/403 unless the user may write this event's calendar.
permissions.accessible_local_calendar(db, current_user, ev.calendar_id, require_write=True)
return ev
@router.put("/events/{uid}")
@@ -200,17 +232,9 @@ def update_event(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
ev = (
db.query(models.LocalEvent)
.join(models.LocalCalendar)
.filter(
models.LocalEvent.uid == uid,
models.LocalCalendar.user_id == current_user.id,
)
.first()
)
if not ev:
raise HTTPException(404, "Event not found")
ev = _writable_event(db, current_user, uid)
if data.private is not None:
ev.is_private = data.private
if data.title is not None:
ev.title = data.title
if data.start is not None:
@@ -243,17 +267,194 @@ def delete_event(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
ev = (
db.query(models.LocalEvent)
.join(models.LocalCalendar)
.filter(
models.LocalEvent.uid == uid,
models.LocalCalendar.user_id == current_user.id,
)
.first()
)
if not ev:
raise HTTPException(404, "Event not found")
ev = _writable_event(db, current_user, uid)
db.delete(ev)
db.commit()
return {"ok": True}
# ── Sharing (owner only) ──────────────────────────────────
@router.get("/calendars/{calendar_id}/shares")
def list_shares(
calendar_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
permissions.is_calendar_owner(db, current_user, calendar_id)
shares = (
db.query(models.CalendarShare)
.filter(models.CalendarShare.calendar_id == calendar_id)
.all()
)
out = []
for s in shares:
u = db.query(models.User).filter(models.User.id == s.user_id).first()
out.append({
"user_id": s.user_id,
"display_name": u.username if u else None,
"permission": s.permission,
"created_at": s.created_at,
})
return out
@router.post("/calendars/{calendar_id}/shares")
def add_share(
calendar_id: int,
data: ShareCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
permissions.is_calendar_owner(db, current_user, calendar_id)
if data.permission not in ("read", "read_write"):
raise HTTPException(422, "permission must be 'read' or 'read_write'")
target = db.query(models.User).filter(models.User.id == data.user_id).first()
if not target:
raise HTTPException(404, "User not found")
if target.id == current_user.id:
raise HTTPException(422, "Cannot share a calendar with yourself")
share = (
db.query(models.CalendarShare)
.filter(
models.CalendarShare.calendar_id == calendar_id,
models.CalendarShare.user_id == data.user_id,
)
.first()
)
if share:
share.permission = data.permission # update existing
else:
share = models.CalendarShare(
calendar_id=calendar_id,
user_id=data.user_id,
permission=data.permission,
created_at=_now_iso(),
)
db.add(share)
db.commit()
return {"ok": True}
@router.delete("/calendars/{calendar_id}/shares/{user_id}")
def remove_share(
calendar_id: int,
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
permissions.is_calendar_owner(db, current_user, calendar_id)
share = (
db.query(models.CalendarShare)
.filter(
models.CalendarShare.calendar_id == calendar_id,
models.CalendarShare.user_id == user_id,
)
.first()
)
if not share:
raise HTTPException(404, "Share not found")
db.delete(share)
db.commit()
return {"ok": True}
# ── iCal Import / Export (local calendars only) ───────────
def _import_ics_into(cal: models.LocalCalendar, raw: bytes, db: Session) -> dict:
parsed = ical_io.parse_ics(raw)
imported = 0
skipped = 0
for item in parsed["events"]:
uid = item.get("uid") or str(uuid.uuid4())
existing = db.query(models.LocalEvent).filter(models.LocalEvent.uid == uid).first()
if existing:
skipped += 1
continue
ev = models.LocalEvent(
calendar_id=cal.id,
uid=uid,
title=item.get("title") or "(ohne Titel)",
start=item["start"],
end=item["end"],
all_day=item.get("all_day", False),
location=item.get("location"),
description=item.get("description"),
rrule=item.get("rrule"),
exdate=item.get("exdate"),
creator_name_external=item.get("organizer"),
)
db.add(ev)
imported += 1
db.commit()
return {"imported": imported, "skipped": skipped, "errors": parsed["errors"]}
@router.post("/calendars/{calendar_id}/import")
async def import_calendar(
calendar_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
cal = permissions.accessible_local_calendar(db, current_user, calendar_id, require_write=True)
raw = await file.read()
try:
return _import_ics_into(cal, raw, db)
except ValueError as e:
raise HTTPException(422, str(e))
@router.post("/import")
async def import_generic(
file: UploadFile = File(...),
calendar_id: Optional[int] = Form(None),
create_calendar: bool = Form(False),
calendar_name: Optional[str] = Form(None),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
if create_calendar:
cal = models.LocalCalendar(
user_id=current_user.id,
name=calendar_name or "Importiert",
)
db.add(cal)
db.commit()
db.refresh(cal)
elif calendar_id is not None:
cal = permissions.accessible_local_calendar(db, current_user, calendar_id, require_write=True)
else:
raise HTTPException(422, "Provide calendar_id or create_calendar=true")
raw = await file.read()
try:
result = _import_ics_into(cal, raw, db)
except ValueError as e:
raise HTTPException(422, str(e))
result["calendar_id"] = cal.id
return result
@router.get("/calendars/{calendar_id}/export")
def export_calendar(
calendar_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
cal = permissions.accessible_local_calendar(db, current_user, calendar_id)
events = (
db.query(models.LocalEvent)
.filter(models.LocalEvent.calendar_id == cal.id)
.all()
)
# Resolve creator display names for ORGANIZER.
name_cache = {u.id: u.username for u in db.query(models.User).all()}
ics = ical_io.build_ics(cal, events, name_cache=name_cache)
safe_name = "".join(c for c in cal.name if c.isalnum() or c in (" ", "-", "_")).strip() or "calendar"
return Response(
content=ics,
media_type="text/calendar",
headers={"Content-Disposition": f'attachment; filename="{safe_name}.ics"'},
)

View File

@@ -1,6 +1,6 @@
from typing import Optional
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
@@ -27,6 +27,7 @@ class SettingsUpdate(BaseModel):
text_color: Optional[str] = None
line_color: Optional[str] = None
bg_color: Optional[str] = None
private_event_visibility: Optional[str] = None
def _settings_dict(s: models.UserSettings) -> dict:
@@ -46,6 +47,7 @@ def _settings_dict(s: models.UserSettings) -> dict:
"text_color": s.text_color,
"line_color": s.line_color,
"bg_color": s.bg_color,
"private_event_visibility": s.private_event_visibility or "busy",
}
@@ -82,6 +84,9 @@ def update_settings(
settings = models.UserSettings(user_id=current_user.id)
db.add(settings)
if data.private_event_visibility is not None and data.private_event_visibility not in ("hidden", "busy"):
raise HTTPException(422, "private_event_visibility must be 'hidden' or 'busy'")
# For these three override colours, an explicit null is meaningful
# ("reset to default") and must be persisted as NULL. All other fields
# keep the previous behaviour where a null/missing value is ignored.

View File

@@ -35,6 +35,25 @@ def list_users(
return [_user_dict(u) for u in db.query(models.User).all()]
@router.get("/directory")
def user_directory(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
"""Lightweight list of all users (id + display_name) for sharing/group pickers.
Available to any authenticated user (unlike GET / which is admin-only).
Excludes the requesting user.
"""
users = (
db.query(models.User)
.filter(models.User.id != current_user.id)
.order_by(models.User.username)
.all()
)
return [{"id": u.id, "display_name": u.username} for u in users]
@router.post("/")
def create_user(
req: CreateUserRequest,

61
backend/tests/conftest.py Normal file
View File

@@ -0,0 +1,61 @@
"""Pytest fixtures: an isolated app + temp SQLite DB, wiped between tests."""
import os
import sys
import tempfile
from pathlib import Path
# Use a throwaway data dir BEFORE importing the app (database.py reads DATA_DIR
# at import time and builds the engine from it).
os.environ.setdefault("DATA_DIR", tempfile.mkdtemp(prefix="calendarr-test-"))
os.environ.setdefault("SECRET_KEY", "test-secret-key")
BACKEND_DIR = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(BACKEND_DIR))
import pytest
from fastapi.testclient import TestClient
import main # noqa: E402 (creates tables + runs migrations against the temp DB)
import models # noqa: E402
from database import engine # noqa: E402
@pytest.fixture
def client():
return TestClient(main.app)
@pytest.fixture(autouse=True)
def clean_db():
"""Wipe every table before each test for isolation."""
with engine.begin() as conn:
for table in reversed(models.Base.metadata.sorted_tables):
conn.execute(table.delete())
yield
# ── Helpers ───────────────────────────────────────────────
def register_admin(client, username="admin", password="pw"):
r = client.post("/api/auth/setup", json={"username": username, "password": password})
assert r.status_code == 200, r.text
return r.json()["access_token"]
def create_user(client, admin_token, username, password="pw"):
r = client.post(
"/api/users/",
headers={"Authorization": f"Bearer {admin_token}"},
json={"username": username, "password": password},
)
assert r.status_code == 200, r.text
uid = r.json()["id"]
# Log in to get the user's own token.
r2 = client.post("/api/auth/login", json={"username": username, "password": password})
assert r2.status_code == 200, r2.text
return uid, r2.json()["access_token"]
def auth(token):
return {"Authorization": f"Bearer {token}"}

View File

@@ -0,0 +1,282 @@
"""Tests for sharing, group permissions, the iCal parser, and private filtering."""
from conftest import register_admin, create_user, auth
RANGE = {"start": "2026-06-01T00:00:00Z", "end": "2026-06-30T00:00:00Z"}
def _make_calendar(client, token, name="Cal"):
r = client.post("/api/local/calendars", headers=auth(token), json={"name": name})
assert r.status_code == 200, r.text
return r.json()["id"]
def _make_event(client, token, cal_id, title="Event", private=False,
start="2026-06-10T10:00:00+00:00", end="2026-06-10T11:00:00+00:00"):
r = client.post("/api/local/events", headers=auth(token), json={
"calendar_id": cal_id, "title": title, "start": start, "end": end,
"private": private,
})
assert r.status_code == 200, r.text
return r.json()
# ── Sharing ───────────────────────────────────────────────
def test_share_read_then_read_write(client):
admin = register_admin(client)
b_id, b_tok = create_user(client, admin, "bob")
cal_id = _make_calendar(client, admin, "Admins Kalender")
ev = _make_event(client, admin, cal_id, "Meeting")
# Creator field populated server-side.
assert ev["creator"]["display_name"] == "admin"
assert ev["type"] == "local"
# Share read-only with bob.
r = client.post(f"/api/local/calendars/{cal_id}/shares", headers=auth(admin),
json={"user_id": b_id, "permission": "read"})
assert r.status_code == 200, r.text
# Bob sees the shared calendar with shared_by.
cals = client.get("/api/local/calendars", headers=auth(b_tok)).json()
shared = [c for c in cals if not c["owned"]]
assert len(shared) == 1
assert shared[0]["shared_by"] == "admin"
assert shared[0]["permission"] == "read"
# Bob sees the event in the merged read.
events = client.get("/api/caldav/events", headers=auth(b_tok), params=RANGE).json()["events"]
assert any(e["title"] == "Meeting" for e in events)
# Bob cannot write (read-only) -> 403.
r = client.post("/api/local/events", headers=auth(b_tok), json={
"calendar_id": cal_id, "title": "Nope",
"start": "2026-06-11T10:00:00+00:00", "end": "2026-06-11T11:00:00+00:00",
})
assert r.status_code == 403, r.text
# Upgrade to read_write -> bob can write.
client.post(f"/api/local/calendars/{cal_id}/shares", headers=auth(admin),
json={"user_id": b_id, "permission": "read_write"})
r = client.post("/api/local/events", headers=auth(b_tok), json={
"calendar_id": cal_id, "title": "Bobs Eintrag",
"start": "2026-06-11T10:00:00+00:00", "end": "2026-06-11T11:00:00+00:00",
})
assert r.status_code == 200, r.text
# Created by bob.
assert r.json()["creator"]["display_name"] == "bob"
def test_non_owner_cannot_manage_shares(client):
admin = register_admin(client)
b_id, b_tok = create_user(client, admin, "bob")
cal_id = _make_calendar(client, admin)
# Bob (no access at all) cannot list shares -> 404 (existence hidden).
r = client.get(f"/api/local/calendars/{cal_id}/shares", headers=auth(b_tok))
assert r.status_code == 404
def test_unshared_calendar_invisible(client):
admin = register_admin(client)
_b_id, b_tok = create_user(client, admin, "bob")
cal_id = _make_calendar(client, admin)
_make_event(client, admin, cal_id, "Privat")
events = client.get("/api/caldav/events", headers=auth(b_tok), params=RANGE).json()["events"]
assert not any(e["title"] == "Privat" for e in events)
# ── Groups ────────────────────────────────────────────────
def test_group_create_and_members(client):
admin = register_admin(client)
b_id, b_tok = create_user(client, admin, "bob")
c_id, c_tok = create_user(client, admin, "carol")
r = client.post("/api/groups/", headers=auth(admin),
json={"name": "Familie", "member_ids": [b_id]})
assert r.status_code == 200, r.text
group = r.json()
gid = group["id"]
assert group["group_calendar_id"] is not None
assert {m["display_name"] for m in group["members"]} == {"admin", "bob"}
# Both members see the group.
assert any(g["id"] == gid for g in client.get("/api/groups/", headers=auth(b_tok)).json())
# Carol is not a member.
assert not any(g["id"] == gid for g in client.get("/api/groups/", headers=auth(c_tok)).json())
assert client.get(f"/api/groups/{gid}", headers=auth(c_tok)).status_code == 403
# Only owner adds members.
assert client.post(f"/api/groups/{gid}/members", headers=auth(b_tok),
json={"user_id": c_id}).status_code == 403
assert client.post(f"/api/groups/{gid}/members", headers=auth(admin),
json={"user_id": c_id}).status_code == 200
# Member can leave; owner cannot be removed.
assert client.delete(f"/api/groups/{gid}/members/{c_id}", headers=auth(c_tok)).status_code == 200
admin_id = client.get("/api/auth/me", headers=auth(admin)).json()["id"]
assert client.delete(f"/api/groups/{gid}/members/{admin_id}", headers=auth(admin)).status_code == 422
def test_group_members_can_write_group_calendar(client):
admin = register_admin(client)
b_id, b_tok = create_user(client, admin, "bob")
group = client.post("/api/groups/", headers=auth(admin),
json={"name": "Team", "member_ids": [b_id]}).json()
gcal = group["group_calendar_id"]
# Bob (member, not owner of the calendar) can create in the group calendar.
r = client.post("/api/local/events", headers=auth(b_tok), json={
"calendar_id": gcal, "title": "Teamtermin",
"start": "2026-06-12T09:00:00+00:00", "end": "2026-06-12T10:00:00+00:00",
})
assert r.status_code == 200, r.text
def test_combined_view_marks_owner_and_group_event(client):
admin = register_admin(client)
b_id, b_tok = create_user(client, admin, "bob")
group = client.post("/api/groups/", headers=auth(admin),
json={"name": "Team", "member_ids": [b_id]}).json()
gid = group["id"]
gcal = group["group_calendar_id"]
# Bob's own calendar + event.
b_cal = _make_calendar(client, b_tok, "Bobs Kalender")
_make_event(client, b_tok, b_cal, "Bobs Termin")
# A group-calendar event.
_make_event(client, admin, gcal, "Gruppentermin")
events = client.get(f"/api/groups/{gid}/combined", headers=auth(admin), params=RANGE).json()["events"]
titles = {e["title"]: e for e in events}
assert "Bobs Termin" in titles
assert titles["Bobs Termin"]["owner"]["display_name"] == "bob"
assert titles["Bobs Termin"].get("is_group_event") is not True
assert "Gruppentermin" in titles
assert titles["Gruppentermin"]["is_group_event"] is True
# ── Private filtering ─────────────────────────────────────
def _combined_titles(client, token, gid):
evs = client.get(f"/api/groups/{gid}/combined", headers=auth(token), params=RANGE).json()["events"]
return evs
def test_private_visibility_hidden_and_busy(client):
admin = register_admin(client)
b_id, b_tok = create_user(client, admin, "bob")
group = client.post("/api/groups/", headers=auth(admin),
json={"name": "Team", "member_ids": [b_id]}).json()
gid = group["id"]
b_cal = _make_calendar(client, b_tok, "Bobs Kalender")
_make_event(client, b_tok, b_cal, "Geheimes", private=True,
start="2026-06-15T10:00:00+00:00", end="2026-06-15T11:00:00+00:00")
# Bob sees his own private event in full.
own = _combined_titles(client, b_tok, gid)
assert any(e["title"] == "Geheimes" for e in own)
# Default visibility = busy -> admin sees it as anonymous "Beschäftigt".
seen = _combined_titles(client, admin, gid)
busy = [e for e in seen if e["start"].startswith("2026-06-15")]
assert busy and all(e["title"] == "Beschäftigt" for e in busy)
assert all(e["location"] == "" and e["description"] == "" for e in busy)
# Switch bob to hidden -> admin no longer sees it at all.
client.put("/api/settings/", headers=auth(b_tok), json={"private_event_visibility": "hidden"})
seen2 = _combined_titles(client, admin, gid)
assert not any(e["start"].startswith("2026-06-15") for e in seen2)
def test_private_visibility_validation(client):
admin = register_admin(client)
r = client.put("/api/settings/", headers=auth(admin), json={"private_event_visibility": "bogus"})
assert r.status_code == 422
# ── iCal import/export ────────────────────────────────────
SAMPLE_ICS = b"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Test//EN
BEGIN:VEVENT
UID:evt-1@test
SUMMARY:Importiert 1
DTSTART:20260620T100000Z
DTEND:20260620T110000Z
LOCATION:Buero
ORGANIZER;CN=Max Mustermann:mailto:max@example.com
RRULE:FREQ=WEEKLY;BYDAY=MO
END:VEVENT
BEGIN:VEVENT
UID:evt-2@test
SUMMARY:Importiert 2
DTSTART;VALUE=DATE:20260621
DTEND;VALUE=DATE:20260622
END:VEVENT
END:VCALENDAR
"""
def test_ical_parser_roundtrip():
import ical_io
parsed = ical_io.parse_ics(SAMPLE_ICS)
assert len(parsed["events"]) == 2
ev1 = next(e for e in parsed["events"] if e["uid"] == "evt-1@test")
assert ev1["title"] == "Importiert 1"
assert ev1["location"] == "Buero"
assert ev1["organizer"] == "Max Mustermann"
assert ev1["rrule"] == "FREQ=WEEKLY;BYDAY=MO"
ev2 = next(e for e in parsed["events"] if e["uid"] == "evt-2@test")
assert ev2["all_day"] is True
def test_import_dedupes_by_uid(client):
admin = register_admin(client)
cal_id = _make_calendar(client, admin, "Import-Ziel")
files = {"file": ("test.ics", SAMPLE_ICS, "text/calendar")}
r = client.post(f"/api/local/calendars/{cal_id}/import", headers=auth(admin), files=files)
assert r.status_code == 200, r.text
body = r.json()
assert body["imported"] == 2 and body["skipped"] == 0
# Re-import -> all skipped (UID dedupe).
files = {"file": ("test.ics", SAMPLE_ICS, "text/calendar")}
r2 = client.post(f"/api/local/calendars/{cal_id}/import", headers=auth(admin), files=files)
assert r2.json()["imported"] == 0 and r2.json()["skipped"] == 2
# Imported events carry the external creator name.
events = client.get("/api/caldav/events", headers=auth(admin), params=RANGE).json()["events"]
imported = [e for e in events if e["title"] == "Importiert 1"]
assert imported and imported[0]["creator"]["display_name"] == "Max Mustermann (importiert)"
def test_export_contains_organizer_and_rrule(client):
admin = register_admin(client)
cal_id = _make_calendar(client, admin, "Export-Test")
_make_event(client, admin, cal_id, "Wöchentlich")
# Add a recurring rule via update.
events = client.get("/api/caldav/events", headers=auth(admin), params=RANGE).json()["events"]
uid = next(e["id"] for e in events if e["title"] == "Wöchentlich")
client.put(f"/api/local/events/{uid}", headers=auth(admin), json={"rrule": "FREQ=WEEKLY;BYDAY=MO"})
r = client.get(f"/api/local/calendars/{cal_id}/export", headers=auth(admin))
assert r.status_code == 200
assert r.headers["content-type"].startswith("text/calendar")
body = r.text
assert "BEGIN:VCALENDAR" in body
assert "ORGANIZER" in body and "admin" in body
assert "RRULE" in body
def test_import_export_only_local(client):
"""Import/export endpoints reject non-existent / inaccessible calendars."""
admin = register_admin(client)
_b, b_tok = create_user(client, admin, "bob")
cal_id = _make_calendar(client, admin, "Privat")
# Bob has no access -> 404 on export.
assert client.get(f"/api/local/calendars/{cal_id}/export", headers=auth(b_tok)).status_code == 404

3
requirements-dev.txt Normal file
View File

@@ -0,0 +1,3 @@
-r requirements.txt
pytest>=8.0
httpx>=0.27